darkplex-core/cortex/composite_scorer.py
Claudia 43d033e242 feat: initial cortex package — 8 intelligence modules, CLI, Docker
Modules: triage, health_scanner, feedback_loop, memory_hygiene,
         roadmap, validate_output, enhanced_search, auto_handoff
         + composite_scorer, intent_classifier

CLI: 'cortex <module> <command>' unified entry point
Tests: 157/169 passing (12 assertion mismatches from rename)
Docker: python:3.11-slim based
2026-02-09 11:18:20 +01:00

220 lines
7.6 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Composite Scorer — Re-rank memory search results using recency, source-type,
and multi-term confidence weighting.
Formula: final_score = w_search * search_score + w_recency * recency_score + w_source * source_weight
All weights configurable via config.json.
"""
import json
import math
import re
from dataclasses import dataclass, field, asdict
from datetime import datetime, date
from pathlib import Path
from typing import Optional
# Date patterns for extraction
DATE_PATH_RE = re.compile(r'(\d{4})-(\d{2})-(\d{2})')
DATE_CONTENT_RE = re.compile(r'(?:^|\s)(\d{4})-(\d{2})-(\d{2})(?:\s|$|\.)')
CONFIG_PATH = Path(__file__).parent / "config.json"
@dataclass
class SearchResult:
"""A search result to be re-scored."""
text: str
source_path: str = ""
original_score: float = 0.0
metadata: dict = field(default_factory=dict)
final_score: float = 0.0
score_breakdown: dict = field(default_factory=dict)
def load_config(path: Optional[Path] = None) -> dict:
"""Load scoring configuration from JSON."""
p = path or CONFIG_PATH
if p.exists():
with open(p) as f:
return json.load(f).get("composite_scoring", {})
return {}
def extract_date(result: SearchResult) -> Optional[date]:
"""Extract the most relevant date from a search result's path or content."""
# Try path first (most reliable)
m = DATE_PATH_RE.search(result.source_path)
if m:
try:
return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
pass
# Try metadata
for key in ("date", "timestamp", "created", "modified"):
if key in result.metadata:
val = str(result.metadata[key])
m = DATE_PATH_RE.search(val)
if m:
try:
return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
pass
# Try content (first match)
m = DATE_CONTENT_RE.search(result.text[:500])
if m:
try:
return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
pass
return None
def recency_score(result_date: Optional[date], reference_date: Optional[date] = None,
half_life_days: float = 14.0) -> float:
"""Calculate recency score using exponential decay.
Returns 1.0 for today, ~0.5 at half_life_days ago, approaching 0 for old results.
Returns 0.3 (neutral) if no date can be determined.
"""
if result_date is None:
return 0.3 # neutral score for undated content
ref = reference_date or date.today()
days_old = (ref - result_date).days
if days_old < 0:
days_old = 0 # future dates treated as today
# Exponential decay: score = 2^(-days/half_life)
return math.pow(2, -days_old / half_life_days)
def source_weight(source_path: str, source_weights: dict) -> float:
"""Determine source weight based on path pattern matching.
Matches are tried from most specific to least specific.
"""
if not source_path:
return source_weights.get("default", 0.4)
# Normalize path
normalized = source_path.replace("\\", "/")
# Check exact/prefix matches first, then regex patterns
best_score = source_weights.get("default", 0.4)
best_specificity = 0
for pattern, weight in source_weights.items():
if pattern == "default":
continue
# Check if pattern is a regex (contains backslash-d or similar)
if "\\" in pattern or "\\d" in pattern:
try:
if re.search(pattern, normalized):
specificity = len(pattern)
if specificity > best_specificity:
best_score = weight
best_specificity = specificity
except re.error:
pass
else:
# Simple substring/prefix match
if pattern in normalized:
specificity = len(pattern)
if specificity > best_specificity:
best_score = weight
best_specificity = specificity
return best_score
def multi_term_confidence(query: str, text: str) -> float:
"""Score boost based on how many query terms appear in result text.
Returns 0.0 to 1.0 based on fraction of query terms found.
"""
terms = [t.lower() for t in query.split() if len(t) > 2]
if not terms:
return 0.5
text_lower = text.lower()
matched = sum(1 for t in terms if t in text_lower)
return matched / len(terms)
def score_results(results: list[SearchResult], query: str = "",
config: Optional[dict] = None,
reference_date: Optional[date] = None,
weight_overrides: Optional[dict] = None) -> list[SearchResult]:
"""Apply composite scoring to a list of search results and return sorted.
Args:
results: List of SearchResult objects to score.
query: Original query string (for multi-term matching).
config: Scoring config dict. Loaded from config.json if None.
reference_date: Date to compute recency from. Defaults to today.
weight_overrides: Override specific weights (e.g. from intent classifier).
Returns:
Results sorted by final_score descending.
"""
if not results:
return results
cfg = config or load_config()
w_search = cfg.get("w_search", 0.45)
w_recency = cfg.get("w_recency", 0.30)
w_source = cfg.get("w_source", 0.25)
half_life = cfg.get("recency_half_life_days", 14.0)
src_weights = cfg.get("source_weights", {"default": 0.4})
multi_boost = cfg.get("multi_term_boost", 0.15)
# Apply overrides from intent classifier
if weight_overrides:
w_search = weight_overrides.get("w_search", w_search)
w_recency = weight_overrides.get("w_recency", w_recency)
w_source = weight_overrides.get("w_source", w_source)
# Source-specific boosts
if "w_source_boost" in weight_overrides:
src_weights = dict(src_weights) # copy
for pattern, boost in weight_overrides["w_source_boost"].items():
src_weights[pattern] = src_weights.get(pattern, 0.4) + boost
for r in results:
r_date = extract_date(r)
r_recency = recency_score(r_date, reference_date, half_life)
r_source = source_weight(r.source_path, src_weights)
r_confidence = multi_term_confidence(query, r.text)
# Composite formula
base = w_search * r.original_score + w_recency * r_recency + w_source * r_source
boost = multi_boost * r_confidence
r.final_score = min(1.0, base + boost)
r.score_breakdown = {
"search": round(r.original_score, 4),
"recency": round(r_recency, 4),
"source": round(r_source, 4),
"confidence": round(r_confidence, 4),
"final": round(r.final_score, 4),
}
results.sort(key=lambda r: r.final_score, reverse=True)
return results
if __name__ == "__main__":
# Demo usage
demo_results = [
SearchResult(text="Gateway watchdog fix applied", source_path="memory/2026-02-07.md", original_score=0.8),
SearchResult(text="Gateway architecture decisions", source_path="MEMORY.md", original_score=0.75),
SearchResult(text="Old gateway notes", source_path="memory/2025-12-01.md", original_score=0.85),
]
scored = score_results(demo_results, query="gateway fix")
for r in scored:
print(f" {r.final_score:.3f} | {r.source_path}: {r.text[:60]}")
print(f" {r.score_breakdown}")