#!/usr/bin/env python3 """ Intent Classifier — Classify memory queries into WHO/WHEN/WHY/WHAT intents using regex-based heuristics. No LLM call, <5ms per query. Output: intent type + suggested weight adjustments for composite scorer. """ import json import re import time from dataclasses import dataclass, field from pathlib import Path from typing import Optional CONFIG_PATH = Path(__file__).parent / "config.json" @dataclass class IntentResult: """Classification result for a query.""" intent: str # WHO, WHEN, WHY, WHAT confidence: float # 0.0 to 1.0 weight_adjustments: dict = field(default_factory=dict) matched_signals: list = field(default_factory=list) classification_ms: float = 0.0 # Compiled patterns cache _compiled_patterns: dict[str, list[re.Pattern]] = {} _config_cache: Optional[dict] = None def _load_config(path: Optional[Path] = None) -> dict: """Load intent classification config.""" global _config_cache if _config_cache is not None: return _config_cache p = path or CONFIG_PATH if p.exists(): with open(p) as f: _config_cache = json.load(f).get("intent_classification", {}) return _config_cache _config_cache = {} return _config_cache def _get_patterns(intent: str, config: dict) -> list[re.Pattern]: """Get compiled regex patterns for an intent (cached).""" if intent not in _compiled_patterns: raw = config.get(intent, {}).get("patterns", []) _compiled_patterns[intent] = [re.compile(p, re.IGNORECASE) for p in raw] return _compiled_patterns[intent] def classify(query: str, config: Optional[dict] = None) -> IntentResult: """Classify a query into WHO/WHEN/WHY/WHAT intent. Uses keyword matching and regex patterns. Designed for <5ms execution. Args: query: The search query string. config: Optional config dict. Loaded from config.json if None. Returns: IntentResult with intent type, confidence, and weight adjustments. """ start = time.perf_counter() cfg = config or _load_config() if not query or not query.strip(): elapsed = (time.perf_counter() - start) * 1000 return IntentResult(intent="WHAT", confidence=0.1, classification_ms=elapsed, matched_signals=["empty_query"]) query_lower = query.lower().strip() scores: dict[str, float] = {"WHO": 0.0, "WHEN": 0.0, "WHY": 0.0, "WHAT": 0.0} signals: dict[str, list[str]] = {"WHO": [], "WHEN": [], "WHY": [], "WHAT": []} for intent in ("WHO", "WHEN", "WHY", "WHAT"): intent_cfg = cfg.get(intent, {}) # Keyword matching (fast) keywords = intent_cfg.get("keywords", []) for kw in keywords: if kw.lower() in query_lower: scores[intent] += 1.0 signals[intent].append(f"kw:{kw}") # Regex pattern matching for pattern in _get_patterns(intent, cfg): if pattern.search(query_lower): scores[intent] += 2.0 # patterns are more specific signals[intent].append(f"re:{pattern.pattern[:30]}") # Additional heuristics # Names (capitalized words not at start) suggest WHO — but exclude known non-person terms # German nouns are capitalized — so caps heuristic needs extra guard: # Only count as person-name if the word is NOT a common German/tech noun # and there's additional WHO signal (keyword/pattern already scored). _NON_PERSON_CAPS = { "sparkasse", "taskboard", "uptime", "kuma", "forgejo", "traefik", "nginx", "docker", "chromadb", "typedb", "nats", "kafka", "pinecone", "odoo", "mondo", "gate", "vainplex", "openclaw", "telegram", "discord", "matrix", "opus", "sonnet", "haiku", "claude", "gemini", "ollama", "mona", "vera", "stella", "viola", "hetzner", "proxmox", "debian", "linux", "python", "api", "cli", "dns", "ssl", "tls", "ssh", "http", "https", "sepa", "bafin", "iso", "iban", "postgres", "sqlite", "redis", "github", # Common German capitalized nouns that aren't people "aufgabe", "zugang", "status", "server", "konto", "liste", "daten", "problem", "fehler", "lösung", "version", "projekt", "system", "email", "rechnung", "zahlung", "vertrag", "termin", "meeting", "deploy", "update", "config", "setup", "deployment", "monitoring", "backup", "migration", "integration", "infrastruktur", "netzwerk", "sicherheit", } words = query.split() if len(words) >= 2: # Check ALL words for name-like capitalization (including first word) caps = [w for w in words if w[0].isupper() and len(w) > 2 and not w.isupper() and w.lower() not in _NON_PERSON_CAPS] if len(caps) >= 2: # Two+ unknown capitalized words strongly suggest names (e.g. "Sebastian Baier") scores["WHO"] += 0.8 * len(caps) signals["WHO"].append(f"multi_caps:{','.join(caps[:3])}") elif caps: # Single unknown cap word — weak signal in German scores["WHO"] += 0.3 signals["WHO"].append(f"caps:{caps[0]}") # Date-like tokens suggest WHEN if re.search(r'\b\d{4}[-/]\d{2}', query) or re.search(r'\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec|januar|februar|märz)\b', query_lower): scores["WHEN"] += 1.5 signals["WHEN"].append("date_token") # Question words at start if query_lower.startswith(("warum ", "why ", "wieso ", "weshalb ")): scores["WHY"] += 3.0 signals["WHY"].append("start_why") elif query_lower.startswith(("wer ", "who ")): scores["WHO"] += 3.0 signals["WHO"].append("start_who") elif query_lower.startswith(("wann ", "when ")): scores["WHEN"] += 3.0 signals["WHEN"].append("start_when") # Pick winner best_intent = max(scores, key=scores.get) total = sum(scores.values()) confidence = scores[best_intent] / total if total > 0 else 0.25 # If no strong signal, default to WHAT if scores[best_intent] < 0.5: best_intent = "WHAT" confidence = 0.3 # Get weight adjustments adjustments = cfg.get(best_intent, {}).get("weight_adjustments", {}) elapsed = (time.perf_counter() - start) * 1000 return IntentResult( intent=best_intent, confidence=round(confidence, 3), weight_adjustments=adjustments, matched_signals=signals[best_intent], classification_ms=round(elapsed, 3), ) def reset_cache(): """Reset config and pattern caches (for testing).""" global _config_cache, _compiled_patterns _config_cache = None _compiled_patterns = {} if __name__ == "__main__": import sys queries = sys.argv[1:] or [ "Albert Hild contact", "when did we fix the gateway", "why did we choose NATS over Kafka", "Mondo Gate regulatory status", "wer ist Sebastian Baier", "wann wurde TypeDB eingerichtet", "warum ChromaDB statt Pinecone", ] for q in queries: r = classify(q) print(f" [{r.intent}] ({r.confidence:.2f}, {r.classification_ms:.2f}ms) {q}") if r.matched_signals: print(f" signals: {r.matched_signals}")