#!/usr/bin/env python3 """ Composite Scorer — Re-rank memory search results using recency, source-type, and multi-term confidence weighting. Formula: final_score = w_search * search_score + w_recency * recency_score + w_source * source_weight All weights configurable via config.json. """ import json import math import re from dataclasses import dataclass, field, asdict from datetime import datetime, date from pathlib import Path from typing import Optional # Date patterns for extraction DATE_PATH_RE = re.compile(r'(\d{4})-(\d{2})-(\d{2})') DATE_CONTENT_RE = re.compile(r'(?:^|\s)(\d{4})-(\d{2})-(\d{2})(?:\s|$|\.)') CONFIG_PATH = Path(__file__).parent / "config.json" @dataclass class SearchResult: """A search result to be re-scored.""" text: str source_path: str = "" original_score: float = 0.0 metadata: dict = field(default_factory=dict) final_score: float = 0.0 score_breakdown: dict = field(default_factory=dict) def load_config(path: Optional[Path] = None) -> dict: """Load scoring configuration from JSON.""" p = path or CONFIG_PATH if p.exists(): with open(p) as f: return json.load(f).get("composite_scoring", {}) return {} def extract_date(result: SearchResult) -> Optional[date]: """Extract the most relevant date from a search result's path or content.""" # Try path first (most reliable) m = DATE_PATH_RE.search(result.source_path) if m: try: return date(int(m.group(1)), int(m.group(2)), int(m.group(3))) except ValueError: pass # Try metadata for key in ("date", "timestamp", "created", "modified"): if key in result.metadata: val = str(result.metadata[key]) m = DATE_PATH_RE.search(val) if m: try: return date(int(m.group(1)), int(m.group(2)), int(m.group(3))) except ValueError: pass # Try content (first match) m = DATE_CONTENT_RE.search(result.text[:500]) if m: try: return date(int(m.group(1)), int(m.group(2)), int(m.group(3))) except ValueError: pass return None def recency_score(result_date: Optional[date], reference_date: Optional[date] = None, half_life_days: float = 14.0) -> float: """Calculate recency score using exponential decay. Returns 1.0 for today, ~0.5 at half_life_days ago, approaching 0 for old results. Returns 0.3 (neutral) if no date can be determined. """ if result_date is None: return 0.3 # neutral score for undated content ref = reference_date or date.today() days_old = (ref - result_date).days if days_old < 0: days_old = 0 # future dates treated as today # Exponential decay: score = 2^(-days/half_life) return math.pow(2, -days_old / half_life_days) def source_weight(source_path: str, source_weights: dict) -> float: """Determine source weight based on path pattern matching. Matches are tried from most specific to least specific. """ if not source_path: return source_weights.get("default", 0.4) # Normalize path normalized = source_path.replace("\\", "/") # Check exact/prefix matches first, then regex patterns best_score = source_weights.get("default", 0.4) best_specificity = 0 for pattern, weight in source_weights.items(): if pattern == "default": continue # Check if pattern is a regex (contains backslash-d or similar) if "\\" in pattern or "\\d" in pattern: try: if re.search(pattern, normalized): specificity = len(pattern) if specificity > best_specificity: best_score = weight best_specificity = specificity except re.error: pass else: # Simple substring/prefix match if pattern in normalized: specificity = len(pattern) if specificity > best_specificity: best_score = weight best_specificity = specificity return best_score def multi_term_confidence(query: str, text: str) -> float: """Score boost based on how many query terms appear in result text. Returns 0.0 to 1.0 based on fraction of query terms found. """ terms = [t.lower() for t in query.split() if len(t) > 2] if not terms: return 0.5 text_lower = text.lower() matched = sum(1 for t in terms if t in text_lower) return matched / len(terms) def score_results(results: list[SearchResult], query: str = "", config: Optional[dict] = None, reference_date: Optional[date] = None, weight_overrides: Optional[dict] = None) -> list[SearchResult]: """Apply composite scoring to a list of search results and return sorted. Args: results: List of SearchResult objects to score. query: Original query string (for multi-term matching). config: Scoring config dict. Loaded from config.json if None. reference_date: Date to compute recency from. Defaults to today. weight_overrides: Override specific weights (e.g. from intent classifier). Returns: Results sorted by final_score descending. """ if not results: return results cfg = config or load_config() w_search = cfg.get("w_search", 0.45) w_recency = cfg.get("w_recency", 0.30) w_source = cfg.get("w_source", 0.25) half_life = cfg.get("recency_half_life_days", 14.0) src_weights = cfg.get("source_weights", {"default": 0.4}) multi_boost = cfg.get("multi_term_boost", 0.15) # Apply overrides from intent classifier if weight_overrides: w_search = weight_overrides.get("w_search", w_search) w_recency = weight_overrides.get("w_recency", w_recency) w_source = weight_overrides.get("w_source", w_source) # Source-specific boosts if "w_source_boost" in weight_overrides: src_weights = dict(src_weights) # copy for pattern, boost in weight_overrides["w_source_boost"].items(): src_weights[pattern] = src_weights.get(pattern, 0.4) + boost for r in results: r_date = extract_date(r) r_recency = recency_score(r_date, reference_date, half_life) r_source = source_weight(r.source_path, src_weights) r_confidence = multi_term_confidence(query, r.text) # Composite formula base = w_search * r.original_score + w_recency * r_recency + w_source * r_source boost = multi_boost * r_confidence r.final_score = min(1.0, base + boost) r.score_breakdown = { "search": round(r.original_score, 4), "recency": round(r_recency, 4), "source": round(r_source, 4), "confidence": round(r_confidence, 4), "final": round(r.final_score, 4), } results.sort(key=lambda r: r.final_score, reverse=True) return results if __name__ == "__main__": # Demo usage demo_results = [ SearchResult(text="Gateway watchdog fix applied", source_path="memory/2026-02-07.md", original_score=0.8), SearchResult(text="Gateway architecture decisions", source_path="MEMORY.md", original_score=0.75), SearchResult(text="Old gateway notes", source_path="memory/2025-12-01.md", original_score=0.85), ] scored = score_results(demo_results, query="gateway fix") for r in scored: print(f" {r.final_score:.3f} | {r.source_path}: {r.text[:60]}") print(f" {r.score_breakdown}")