darkplex-core/cortex/llm_extractor.py
Claudia fd7d75c0ed
Some checks failed
Tests / test (push) Failing after 2s
Merge darkplex-core into cortex — unified intelligence layer v0.2.0
- Merged all unique darkplex-core modules into cortex:
  - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop)
  - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator)
  - entity_manager.py, knowledge_extractor.py
- Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.'
- Added 'darkplex' CLI alias alongside 'cortex'
- Package renamed to darkplex-core v0.2.0
- 405 tests passing (was 234)
- 14 new test files covering all merged modules
2026-02-12 08:43:02 +01:00

214 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
LLM-Powered Entity Extractor — Uses Ollama for Named Entity Recognition.
Standalone module. No pip dependencies beyond stdlib.
Calls Ollama HTTP API with structured NER prompts.
Configuration via environment variables:
DARKPLEX_OLLAMA_URL — Ollama base URL (default: http://localhost:11434)
DARKPLEX_OLLAMA_MODEL — Model name (default: mistral:7b)
DARKPLEX_OLLAMA_TIMEOUT — Timeout in seconds (default: 10)
DARKPLEX_EXTRACTOR — llm|regex|auto (default: auto)
"""
import json
import logging
import os
import urllib.request
import urllib.error
log = logging.getLogger("llm-extractor")
OLLAMA_URL = os.environ.get("DARKPLEX_OLLAMA_URL", "http://localhost:11434")
OLLAMA_MODEL = os.environ.get("DARKPLEX_OLLAMA_MODEL", "mistral:7b")
OLLAMA_TIMEOUT = int(os.environ.get("DARKPLEX_OLLAMA_TIMEOUT", "30"))
VALID_TYPES = {"person", "organization", "company", "project", "technology",
"location", "event", "concept", "product"}
NER_PROMPT = """Extract all named entities from the text below. Return ONLY a JSON object.
Each key is the entity name (lowercase), each value has "type" and "context".
Valid types: person, organization, company, project, technology, location, event, concept, product
Rules:
- Skip common/generic words (the, system, message, etc.)
- Entity names should be lowercase, use hyphens for multi-word
- "context" is a 2-5 word description of the entity's role in the text
- If no entities found, return empty JSON object
- Return ONLY valid JSON, no explanation
Text:
{text}
JSON:"""
BATCH_PROMPT = """Extract all named entities from these texts. Return ONLY a JSON object.
Each key is the entity name (lowercase, hyphens for spaces), each value has "type" and "context".
Valid types: person, organization, company, project, technology, location, event, concept, product
Rules:
- Skip common/generic words
- "context" is a 2-5 word description
- If no entities found, return empty JSON object
- Return ONLY valid JSON, no markdown, no explanation
Texts:
{texts}
JSON:"""
def _call_ollama(prompt: str) -> str | None:
"""Call Ollama generate API. Returns response text or None on failure."""
payload = json.dumps({
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 1024},
}).encode()
req = urllib.request.Request(
f"{OLLAMA_URL}/api/generate",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=OLLAMA_TIMEOUT) as resp:
data = json.loads(resp.read().decode())
return data.get("response", "")
except (urllib.error.URLError, TimeoutError, OSError) as e:
log.warning(f"Ollama call failed: {e}")
return None
except Exception as e:
log.warning(f"Ollama unexpected error: {e}")
return None
def _parse_json_response(text: str) -> dict:
"""Extract JSON dict from LLM response, handling markdown fences etc."""
if not text:
return {}
# Strip markdown code fences
text = text.strip()
if text.startswith("```"):
lines = text.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines)
# Find the JSON object
start = text.find("{")
if start == -1:
return {}
# Find matching closing brace
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
return {}
return {}
def _normalize_entities(raw: dict) -> dict:
"""Normalize and validate extracted entities."""
result = {}
for name, info in raw.items():
if not isinstance(info, dict):
continue
name = name.strip().lower().replace("_", "-").replace(" ", "-")
if len(name) < 2 or len(name) > 80:
continue
etype = info.get("type", "unknown").lower().strip()
if etype not in VALID_TYPES:
# Map common aliases
aliases = {"org": "organization", "tech": "technology", "loc": "location",
"place": "location", "tool": "technology", "framework": "technology",
"language": "technology", "app": "product", "software": "product",
"service": "product", "group": "organization", "team": "organization"}
etype = aliases.get(etype, "concept")
context = info.get("context", "")
if isinstance(context, str):
context = context[:100]
else:
context = ""
result[name] = {"type": etype, "context": context, "match": "llm"}
return result
def extract_entities_llm(text: str) -> dict[str, dict] | None:
"""
Extract entities from text using Ollama LLM.
Returns dict of {name: {type, context, match}} or None if LLM unavailable.
None signals caller to fall back to regex.
"""
if not text or len(text) < 10:
return {}
# Truncate very long texts
if len(text) > 2000:
text = text[:2000]
prompt = NER_PROMPT.format(text=text)
response = _call_ollama(prompt)
if response is None:
return None # Signal fallback
raw = _parse_json_response(response)
return _normalize_entities(raw)
def extract_entities_llm_batch(texts: list[str]) -> dict[str, dict] | None:
"""
Extract entities from multiple texts in one LLM call.
Returns combined dict or None if LLM unavailable.
"""
if not texts:
return {}
# Filter and truncate
clean = []
for t in texts:
if t and len(t) >= 10:
clean.append(t[:500] if len(t) > 500 else t)
if not clean:
return {}
# Limit batch size to keep prompt reasonable
if len(clean) > 10:
clean = clean[:10]
numbered = "\n".join(f"[{i+1}] {t}" for i, t in enumerate(clean))
prompt = BATCH_PROMPT.format(texts=numbered)
response = _call_ollama(prompt)
if response is None:
return None
raw = _parse_json_response(response)
return _normalize_entities(raw)
def is_available() -> bool:
"""Check if Ollama is reachable."""
try:
req = urllib.request.Request(f"{OLLAMA_URL}/api/tags", method="GET")
with urllib.request.urlopen(req, timeout=3) as resp:
return resp.status == 200
except Exception:
return False