All checks were successful
Tests / test (push) Successful in 2s
All ~/clawd/ references replaced with configurable paths: - CORTEX_HOME (default: ~/.cortex) - CORTEX_MEMORY_DIR, CORTEX_CONFIG, CORTEX_GROWTH_LOG, CORTEX_ROADMAP - permanent_files configurable via config.json - Tests pass both with and without env vars set - 169/169 tests green
241 lines
8.1 KiB
Python
Executable file
241 lines
8.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced Search — Integration wrapper combining intent classification,
|
|
composite scoring, and memory search into a single pipeline.
|
|
|
|
Pipeline:
|
|
1. Classify intent (WHO/WHEN/WHY/WHAT)
|
|
2. Adjust scoring weights based on intent
|
|
3. Run search via unified-memory.py or file-based search
|
|
4. Apply composite scoring to re-rank results
|
|
5. Return re-ranked results
|
|
|
|
Usage:
|
|
python3 enhanced_search.py "query"
|
|
python3 enhanced_search.py --json "query"
|
|
python3 enhanced_search.py --top 5 "query"
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from dataclasses import asdict
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from cortex.composite_scorer import SearchResult, score_results, load_config as load_scorer_config
|
|
from cortex.intent_classifier import classify, IntentResult
|
|
|
|
UNIFIED_MEMORY_SCRIPT = Path(os.environ.get("CORTEX_UNIFIED_MEMORY_SCRIPT", str(Path.home() / ".cortex" / "scripts" / "unified-memory.py")))
|
|
PYTHON = sys.executable or "/usr/bin/python3"
|
|
|
|
# Paths to search directly if unified-memory.py is unavailable
|
|
SEARCH_PATHS = [
|
|
Path(os.environ.get("CORTEX_MEMORY_DIR", str(Path.home() / ".cortex" / "memory"))),
|
|
Path.home() / "life" / "areas",
|
|
]
|
|
|
|
|
|
def search_files(query: str, max_results: int = 20) -> list[SearchResult]:
|
|
"""Fallback file-based search using grep.
|
|
|
|
Searches through memory files for query terms and returns scored results.
|
|
"""
|
|
results = []
|
|
terms = [t for t in query.lower().split() if len(t) > 2]
|
|
if not terms:
|
|
return results
|
|
|
|
for search_path in SEARCH_PATHS:
|
|
if not search_path.exists():
|
|
continue
|
|
|
|
if search_path.is_file():
|
|
files = [search_path]
|
|
else:
|
|
files = sorted(search_path.rglob("*.md"), reverse=True)[:100]
|
|
|
|
for fpath in files:
|
|
try:
|
|
content = fpath.read_text(errors="ignore")
|
|
except (OSError, PermissionError):
|
|
continue
|
|
|
|
content_lower = content.lower()
|
|
matched = sum(1 for t in terms if t in content_lower)
|
|
if matched == 0:
|
|
continue
|
|
|
|
# Score based on term coverage
|
|
score = matched / len(terms)
|
|
|
|
# Extract best matching snippet (around first match)
|
|
snippet = _extract_snippet(content, terms)
|
|
|
|
results.append(SearchResult(
|
|
text=snippet,
|
|
source_path=str(fpath),
|
|
original_score=score,
|
|
metadata={"matched_terms": matched, "total_terms": len(terms)},
|
|
))
|
|
|
|
# Sort by original score and limit
|
|
results.sort(key=lambda r: r.original_score, reverse=True)
|
|
return results[:max_results]
|
|
|
|
|
|
def _extract_snippet(content: str, terms: list[str], context_chars: int = 200) -> str:
|
|
"""Extract a snippet around the first matching term."""
|
|
content_lower = content.lower()
|
|
best_pos = len(content)
|
|
for t in terms:
|
|
pos = content_lower.find(t)
|
|
if 0 <= pos < best_pos:
|
|
best_pos = pos
|
|
|
|
if best_pos >= len(content):
|
|
return content[:300]
|
|
|
|
start = max(0, best_pos - context_chars // 2)
|
|
end = min(len(content), best_pos + context_chars)
|
|
snippet = content[start:end].strip()
|
|
if start > 0:
|
|
snippet = "..." + snippet
|
|
if end < len(content):
|
|
snippet = snippet + "..."
|
|
return snippet
|
|
|
|
|
|
def search_unified(query: str, max_results: int = 20,
|
|
timeout: float = 10.0) -> list[SearchResult]:
|
|
"""Search via unified-memory.py script."""
|
|
if not UNIFIED_MEMORY_SCRIPT.exists():
|
|
return search_files(query, max_results)
|
|
|
|
try:
|
|
proc = subprocess.run(
|
|
[PYTHON, str(UNIFIED_MEMORY_SCRIPT), "--json", query],
|
|
capture_output=True, text=True, timeout=timeout,
|
|
)
|
|
if proc.returncode != 0:
|
|
return search_files(query, max_results)
|
|
|
|
data = json.loads(proc.stdout)
|
|
results = []
|
|
for item in data.get("results", [])[:max_results]:
|
|
results.append(SearchResult(
|
|
text=item.get("text", ""),
|
|
source_path=item.get("metadata", {}).get("path", item.get("source", "")),
|
|
original_score=item.get("score", 0.5),
|
|
metadata=item.get("metadata", {}),
|
|
))
|
|
return results
|
|
|
|
except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception):
|
|
return search_files(query, max_results)
|
|
|
|
|
|
def enhanced_search(query: str, max_results: int = 10,
|
|
use_unified: bool = True) -> dict:
|
|
"""Run the full enhanced search pipeline.
|
|
|
|
Args:
|
|
query: Search query string.
|
|
max_results: Maximum results to return.
|
|
use_unified: Whether to try unified-memory.py first.
|
|
|
|
Returns:
|
|
Dict with intent, results, and timing info.
|
|
"""
|
|
pipeline_start = time.perf_counter()
|
|
|
|
# Step 1: Classify intent
|
|
intent_result = classify(query)
|
|
|
|
# Step 2: Search
|
|
search_start = time.perf_counter()
|
|
if use_unified:
|
|
raw_results = search_unified(query, max_results=max_results * 2)
|
|
else:
|
|
raw_results = search_files(query, max_results=max_results * 2)
|
|
search_ms = (time.perf_counter() - search_start) * 1000
|
|
|
|
# Step 3: Apply composite scoring with intent-adjusted weights
|
|
scoring_start = time.perf_counter()
|
|
scored = score_results(
|
|
raw_results, query=query,
|
|
weight_overrides=intent_result.weight_adjustments,
|
|
)
|
|
scoring_ms = (time.perf_counter() - scoring_start) * 1000
|
|
|
|
# Step 4: Trim to max results
|
|
final = scored[:max_results]
|
|
|
|
pipeline_ms = (time.perf_counter() - pipeline_start) * 1000
|
|
|
|
return {
|
|
"query": query,
|
|
"intent": {
|
|
"type": intent_result.intent,
|
|
"confidence": intent_result.confidence,
|
|
"signals": intent_result.matched_signals,
|
|
"classification_ms": intent_result.classification_ms,
|
|
},
|
|
"results": [
|
|
{
|
|
"text": r.text[:500],
|
|
"source": r.source_path,
|
|
"score": round(r.final_score, 4),
|
|
"breakdown": r.score_breakdown,
|
|
}
|
|
for r in final
|
|
],
|
|
"timing": {
|
|
"classification_ms": round(intent_result.classification_ms, 2),
|
|
"search_ms": round(search_ms, 2),
|
|
"scoring_ms": round(scoring_ms, 2),
|
|
"total_ms": round(pipeline_ms, 2),
|
|
},
|
|
"total_raw": len(raw_results),
|
|
"total_returned": len(final),
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Enhanced memory search with intent classification and composite scoring")
|
|
parser.add_argument("query", help="Search query")
|
|
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
|
|
parser.add_argument("--top", type=int, default=10, help="Number of results (default: 10)")
|
|
parser.add_argument("--no-unified", action="store_true", help="Skip unified-memory.py, use file search only")
|
|
args = parser.parse_args()
|
|
|
|
result = enhanced_search(args.query, max_results=args.top, use_unified=not args.no_unified)
|
|
|
|
if args.json_output:
|
|
print(json.dumps(result, indent=2, ensure_ascii=False))
|
|
else:
|
|
intent = result["intent"]
|
|
print(f"\n🔍 Query: {result['query']}")
|
|
print(f"🎯 Intent: {intent['type']} (confidence: {intent['confidence']:.0%})")
|
|
if intent["signals"]:
|
|
print(f" Signals: {', '.join(intent['signals'])}")
|
|
print(f"⏱️ Total: {result['timing']['total_ms']:.0f}ms "
|
|
f"(classify: {result['timing']['classification_ms']:.1f}ms, "
|
|
f"search: {result['timing']['search_ms']:.0f}ms, "
|
|
f"score: {result['timing']['scoring_ms']:.1f}ms)")
|
|
print(f"📊 {result['total_returned']}/{result['total_raw']} results\n")
|
|
|
|
for i, r in enumerate(result["results"], 1):
|
|
source = Path(r["source"]).name if r["source"] else "unknown"
|
|
print(f" {i}. [{r['score']:.3f}] {source}")
|
|
text_preview = r["text"][:120].replace("\n", " ")
|
|
print(f" {text_preview}")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|