#!/usr/bin/env python3 """ Enhanced Search — Integration wrapper combining intent classification, composite scoring, and memory search into a single pipeline. Pipeline: 1. Classify intent (WHO/WHEN/WHY/WHAT) 2. Adjust scoring weights based on intent 3. Run search via unified-memory.py or file-based search 4. Apply composite scoring to re-rank results 5. Return re-ranked results Usage: python3 enhanced_search.py "query" python3 enhanced_search.py --json "query" python3 enhanced_search.py --top 5 "query" """ import argparse import json import os import re import subprocess import sys import time from dataclasses import asdict from pathlib import Path from typing import Optional from cortex.composite_scorer import SearchResult, score_results, load_config as load_scorer_config from cortex.intent_classifier import classify, IntentResult UNIFIED_MEMORY_SCRIPT = Path(os.environ.get("CORTEX_UNIFIED_MEMORY_SCRIPT", str(Path.home() / ".cortex" / "scripts" / "unified-memory.py"))) PYTHON = sys.executable or "/usr/bin/python3" # Paths to search directly if unified-memory.py is unavailable SEARCH_PATHS = [ Path(os.environ.get("CORTEX_MEMORY_DIR", str(Path.home() / ".cortex" / "memory"))), Path.home() / "life" / "areas", ] def search_files(query: str, max_results: int = 20) -> list[SearchResult]: """Fallback file-based search using grep. Searches through memory files for query terms and returns scored results. """ results = [] terms = [t for t in query.lower().split() if len(t) > 2] if not terms: return results for search_path in SEARCH_PATHS: if not search_path.exists(): continue if search_path.is_file(): files = [search_path] else: files = sorted(search_path.rglob("*.md"), reverse=True)[:100] for fpath in files: try: content = fpath.read_text(errors="ignore") except (OSError, PermissionError): continue content_lower = content.lower() matched = sum(1 for t in terms if t in content_lower) if matched == 0: continue # Score based on term coverage score = matched / len(terms) # Extract best matching snippet (around first match) snippet = _extract_snippet(content, terms) results.append(SearchResult( text=snippet, source_path=str(fpath), original_score=score, metadata={"matched_terms": matched, "total_terms": len(terms)}, )) # Sort by original score and limit results.sort(key=lambda r: r.original_score, reverse=True) return results[:max_results] def _extract_snippet(content: str, terms: list[str], context_chars: int = 200) -> str: """Extract a snippet around the first matching term.""" content_lower = content.lower() best_pos = len(content) for t in terms: pos = content_lower.find(t) if 0 <= pos < best_pos: best_pos = pos if best_pos >= len(content): return content[:300] start = max(0, best_pos - context_chars // 2) end = min(len(content), best_pos + context_chars) snippet = content[start:end].strip() if start > 0: snippet = "..." + snippet if end < len(content): snippet = snippet + "..." return snippet def search_unified(query: str, max_results: int = 20, timeout: float = 10.0) -> list[SearchResult]: """Search via unified-memory.py script.""" if not UNIFIED_MEMORY_SCRIPT.exists(): return search_files(query, max_results) try: proc = subprocess.run( [PYTHON, str(UNIFIED_MEMORY_SCRIPT), "--json", query], capture_output=True, text=True, timeout=timeout, ) if proc.returncode != 0: return search_files(query, max_results) data = json.loads(proc.stdout) results = [] for item in data.get("results", [])[:max_results]: results.append(SearchResult( text=item.get("text", ""), source_path=item.get("metadata", {}).get("path", item.get("source", "")), original_score=item.get("score", 0.5), metadata=item.get("metadata", {}), )) return results except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception): return search_files(query, max_results) def enhanced_search(query: str, max_results: int = 10, use_unified: bool = True) -> dict: """Run the full enhanced search pipeline. Args: query: Search query string. max_results: Maximum results to return. use_unified: Whether to try unified-memory.py first. Returns: Dict with intent, results, and timing info. """ pipeline_start = time.perf_counter() # Step 1: Classify intent intent_result = classify(query) # Step 2: Search search_start = time.perf_counter() if use_unified: raw_results = search_unified(query, max_results=max_results * 2) else: raw_results = search_files(query, max_results=max_results * 2) search_ms = (time.perf_counter() - search_start) * 1000 # Step 3: Apply composite scoring with intent-adjusted weights scoring_start = time.perf_counter() scored = score_results( raw_results, query=query, weight_overrides=intent_result.weight_adjustments, ) scoring_ms = (time.perf_counter() - scoring_start) * 1000 # Step 4: Trim to max results final = scored[:max_results] pipeline_ms = (time.perf_counter() - pipeline_start) * 1000 return { "query": query, "intent": { "type": intent_result.intent, "confidence": intent_result.confidence, "signals": intent_result.matched_signals, "classification_ms": intent_result.classification_ms, }, "results": [ { "text": r.text[:500], "source": r.source_path, "score": round(r.final_score, 4), "breakdown": r.score_breakdown, } for r in final ], "timing": { "classification_ms": round(intent_result.classification_ms, 2), "search_ms": round(search_ms, 2), "scoring_ms": round(scoring_ms, 2), "total_ms": round(pipeline_ms, 2), }, "total_raw": len(raw_results), "total_returned": len(final), } def main(): parser = argparse.ArgumentParser(description="Enhanced memory search with intent classification and composite scoring") parser.add_argument("query", help="Search query") parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON") parser.add_argument("--top", type=int, default=10, help="Number of results (default: 10)") parser.add_argument("--no-unified", action="store_true", help="Skip unified-memory.py, use file search only") args = parser.parse_args() result = enhanced_search(args.query, max_results=args.top, use_unified=not args.no_unified) if args.json_output: print(json.dumps(result, indent=2, ensure_ascii=False)) else: intent = result["intent"] print(f"\nšŸ” Query: {result['query']}") print(f"šŸŽÆ Intent: {intent['type']} (confidence: {intent['confidence']:.0%})") if intent["signals"]: print(f" Signals: {', '.join(intent['signals'])}") print(f"ā±ļø Total: {result['timing']['total_ms']:.0f}ms " f"(classify: {result['timing']['classification_ms']:.1f}ms, " f"search: {result['timing']['search_ms']:.0f}ms, " f"score: {result['timing']['scoring_ms']:.1f}ms)") print(f"šŸ“Š {result['total_returned']}/{result['total_raw']} results\n") for i, r in enumerate(result["results"], 1): source = Path(r["source"]).name if r["source"] else "unknown" print(f" {i}. [{r['score']:.3f}] {source}") text_preview = r["text"][:120].replace("\n", " ") print(f" {text_preview}") print() if __name__ == "__main__": main()