#!/usr/bin/env python3 """Memory Hygiene Tools — find duplicates, stale content, orphans, stats, archive.""" import argparse import hashlib import json import os import re import shutil import sys from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path from cortex.config import memory_dir, archive_dir, permanent_files as get_permanent_files MEMORY_DIR = memory_dir() ARCHIVE_DIR = archive_dir() CONFIG_PATH = Path(__file__).parent / "config.json" PERMANENT_FILES = get_permanent_files() DAILY_NOTE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}(?:-.+)?\.md$") DATE_RE = re.compile(r"\b(20\d{2})-(\d{2})-(\d{2})\b") TODO_RE = re.compile(r"(?:TODO|FIXME|HACK|XXX)\b", re.IGNORECASE) IN_PROGRESS_RE = re.compile(r"status:\s*in.?progress", re.IGNORECASE) LINK_RE = re.compile(r"\[([^\]]*)\]\(([^)]+)\)") EMAIL_RE = re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+") PHONE_RE = re.compile(r"(?:\+\d[\d\s\-]{7,}|\(\d+\)\s*[\d\s\-]{5,})") def load_config(): if CONFIG_PATH.exists(): return json.loads(CONFIG_PATH.read_text()) return {} def get_md_files(base: Path | None = None, recursive: bool = True) -> list[Path]: if base is None: base = MEMORY_DIR if recursive: return sorted(base.rglob("*.md")) return sorted(base.glob("*.md")) def _normalize(text: str) -> str: return re.sub(r"\s+", " ", text.lower().strip()) def _para_hash(para: str) -> str: return hashlib.md5(_normalize(para).encode()).hexdigest() # --- Duplicates --- def find_duplicates(min_length: int = 50, threshold: float = 0.8) -> list[dict]: """Find near-duplicate paragraphs across memory files.""" # Collect paragraphs with their fingerprints para_index: dict[str, list[dict]] = defaultdict(list) for fp in get_md_files(): try: text = fp.read_text(errors="replace") except Exception: continue paragraphs = re.split(r"\n\s*\n", text) line = 1 for para in paragraphs: stripped = para.strip() if len(stripped) < min_length: line += para.count("\n") + 1 continue h = _para_hash(stripped) para_index[h].append({ "file": str(fp.relative_to(MEMORY_DIR)), "line": line, "preview": stripped[:100], }) line += para.count("\n") + 1 dupes = [] for h, locations in para_index.items(): if len(locations) > 1: # Deduplicate by file (same file same hash = skip) seen = set() unique = [] for loc in locations: key = (loc["file"], loc["line"]) if key not in seen: seen.add(key) unique.append(loc) if len(unique) > 1: dupes.append({"hash": h, "locations": unique}) return dupes # --- Staleness --- def find_stale(now: datetime | None = None) -> list[dict]: """Find potentially stale content.""" if now is None: now = datetime.now() cfg = load_config() stale_cfg = cfg.get("staleness", {}) date_days = stale_cfg.get("date_days", 90) todo_days = stale_cfg.get("todo_days", 30) progress_days = stale_cfg.get("in_progress_days", 14) contact_days = stale_cfg.get("contact_days", 180) results = [] for fp in get_md_files(): try: text = fp.read_text(errors="replace") mtime = datetime.fromtimestamp(fp.stat().st_mtime) except Exception: continue rel = str(fp.relative_to(MEMORY_DIR)) lines = text.split("\n") file_age = (now - mtime).days for i, line in enumerate(lines, 1): # Old dates in non-historical context for m in DATE_RE.finditer(line): try: d = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3))) age = (now - d).days if age > date_days and "history" not in rel.lower() and "archive" not in rel.lower(): results.append({ "file": rel, "line": i, "reason": f"Date {m.group(0)} is {age} days old", "severity": "info", }) except ValueError: pass # Old TODOs if TODO_RE.search(line) and file_age > todo_days: results.append({ "file": rel, "line": i, "reason": f"TODO in file not modified for {file_age} days", "severity": "warning", }) # Stale in_progress if IN_PROGRESS_RE.search(line) and file_age > progress_days: results.append({ "file": rel, "line": i, "reason": f"in_progress status, file not updated for {file_age} days", "severity": "warning", }) # Contact info staleness if (EMAIL_RE.search(line) or PHONE_RE.search(line)) and file_age > contact_days: results.append({ "file": rel, "line": i, "reason": f"Contact info in file not updated for {file_age} days", "severity": "info", }) return results # --- Orphans --- def find_orphans() -> dict: """Find orphaned files and broken links.""" all_files = set() for fp in MEMORY_DIR.rglob("*"): if fp.is_file(): all_files.add(str(fp.relative_to(MEMORY_DIR))) # Collect all references referenced = set() broken_links = [] for fp in get_md_files(): try: text = fp.read_text(errors="replace") except Exception: continue rel = str(fp.relative_to(MEMORY_DIR)) for m in LINK_RE.finditer(text): target = m.group(2) if target.startswith("http://") or target.startswith("https://"): continue # Resolve relative to file's directory target_clean = target.split("#")[0].split("?")[0] if not target_clean: continue resolved = (fp.parent / target_clean).resolve() try: ref_rel = str(resolved.relative_to(MEMORY_DIR)) referenced.add(ref_rel) except ValueError: pass if not resolved.exists(): broken_links.append({ "file": rel, "link_text": m.group(1), "target": target, }) # Orphaned files (never referenced, not permanent) orphaned = [] for f in sorted(all_files): name = Path(f).name if name in PERMANENT_FILES: continue if f.startswith("archive/"): continue if f not in referenced: orphaned.append(f) # Empty/near-empty files empty = [] for fp in MEMORY_DIR.rglob("*"): if fp.is_file() and fp.stat().st_size < 10: empty.append(str(fp.relative_to(MEMORY_DIR))) return { "orphaned_files": orphaned, "broken_links": broken_links, "empty_files": sorted(empty), } # --- Stats --- def gather_stats() -> dict: """Gather statistics about the memory directory.""" now = datetime.now() files = list(MEMORY_DIR.rglob("*")) file_list = [f for f in files if f.is_file()] total_size = sum(f.stat().st_size for f in file_list) by_ext: dict[str, int] = defaultdict(int) mtimes = [] sizes = [] word_counts = [] changed_24h = changed_7d = changed_30d = 0 for f in file_list: ext = f.suffix or "(none)" by_ext[ext] += 1 st = f.stat() mt = datetime.fromtimestamp(st.st_mtime) mtimes.append((str(f.relative_to(MEMORY_DIR)), mt)) sizes.append((str(f.relative_to(MEMORY_DIR)), st.st_size)) age = (now - mt).days if age < 1: changed_24h += 1 if age < 7: changed_7d += 1 if age < 30: changed_30d += 1 if f.suffix == ".md": try: words = len(f.read_text(errors="replace").split()) word_counts.append((str(f.relative_to(MEMORY_DIR)), words)) except Exception: pass mtimes.sort(key=lambda x: x[1]) sizes.sort(key=lambda x: x[1], reverse=True) return { "total_files": len(file_list), "total_size_bytes": total_size, "total_size_human": f"{total_size / 1024:.1f} KB", "files_by_extension": dict(sorted(by_ext.items())), "oldest": {"file": mtimes[0][0], "date": mtimes[0][1].isoformat()} if mtimes else None, "newest": {"file": mtimes[-1][0], "date": mtimes[-1][1].isoformat()} if mtimes else None, "largest_files": [{"file": f, "bytes": s} for f, s in sizes[:10]], "changed_24h": changed_24h, "changed_7d": changed_7d, "changed_30d": changed_30d, "word_count_top10": sorted(word_counts, key=lambda x: x[1], reverse=True)[:10], } # --- Archive --- def archive_old_notes(older_than_days: int = 90, execute: bool = False, now: datetime | None = None) -> dict: """Archive old daily notes.""" if now is None: now = datetime.now() cutoff = now - timedelta(days=older_than_days) to_move = [] for fp in MEMORY_DIR.glob("*.md"): name = fp.name if name in PERMANENT_FILES: continue if not DAILY_NOTE_RE.match(name): continue # Extract date from filename m = DATE_RE.match(name) if not m: continue try: file_date = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3))) except ValueError: continue if file_date < cutoff: year = m.group(1) dest_dir = ARCHIVE_DIR / year to_move.append({ "source": str(fp.relative_to(MEMORY_DIR)), "dest": str(dest_dir.relative_to(MEMORY_DIR) / name), "date": file_date.isoformat(), }) manifest = { "archived_at": now.isoformat(), "older_than_days": older_than_days, "dry_run": not execute, "files": to_move, "count": len(to_move), } if execute and to_move: for item in to_move: src = MEMORY_DIR / item["source"] dst = MEMORY_DIR / item["dest"] dst.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(src), str(dst)) # Write manifest manifest_path = ARCHIVE_DIR / f"manifest-{now.strftime('%Y%m%d-%H%M%S')}.json" manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text(json.dumps(manifest, indent=2)) manifest["manifest_path"] = str(manifest_path.relative_to(MEMORY_DIR)) return manifest # --- Report --- def generate_report() -> tuple[str, bool]: """Generate combined markdown report. Returns (report_text, has_critical).""" lines = ["# Memory Hygiene Report", f"Generated: {datetime.now().isoformat()}", ""] has_critical = False # Stats stats = gather_stats() lines.append("## Stats") lines.append(f"- **Files:** {stats['total_files']} ({stats['total_size_human']})") lines.append(f"- **Changed 24h/7d/30d:** {stats['changed_24h']}/{stats['changed_7d']}/{stats['changed_30d']}") lines.append("") # Duplicates dupes = find_duplicates() lines.append(f"## Duplicates ({len(dupes)} found)") for d in dupes[:10]: locs = ", ".join(f"`{l['file']}:{l['line']}`" for l in d["locations"]) lines.append(f"- {locs}: {d['locations'][0]['preview'][:60]}...") lines.append("") # Staleness stale = find_stale() warnings = [s for s in stale if s["severity"] == "warning"] lines.append(f"## Stale Items ({len(stale)} total, {len(warnings)} warnings)") if warnings: has_critical = True for s in stale[:20]: icon = "⚠️" if s["severity"] == "warning" else "ℹ️" lines.append(f"- {icon} `{s['file']}:{s['line']}` — {s['reason']}") lines.append("") # Orphans orph = find_orphans() bl = orph["broken_links"] lines.append(f"## Orphans") lines.append(f"- **Orphaned files:** {len(orph['orphaned_files'])}") lines.append(f"- **Broken links:** {len(bl)}") lines.append(f"- **Empty files:** {len(orph['empty_files'])}") if bl: has_critical = True for b in bl[:10]: lines.append(f" - `{b['file']}` → `{b['target']}` (broken)") lines.append("") # Archive candidates archive = archive_old_notes(older_than_days=90, execute=False) lines.append(f"## Archive Candidates ({archive['count']} files older than 90 days)") for f in archive["files"][:10]: lines.append(f"- `{f['source']}` → `{f['dest']}`") lines.append("") return "\n".join(lines), has_critical def main(): parser = argparse.ArgumentParser(description="Memory Hygiene Tools") sub = parser.add_subparsers(dest="command") sub.add_parser("dupes", help="Find duplicate content") sub.add_parser("stale", help="Find stale content") sub.add_parser("orphans", help="Find orphaned files and broken links") sub.add_parser("stats", help="Memory statistics") arc = sub.add_parser("archive", help="Archive old daily notes") arc.add_argument("--older-than", default="90d", help="Age threshold (e.g., 90d)") arc.add_argument("--execute", action="store_true", help="Actually move files (default: dry-run)") sub.add_parser("report", help="Full hygiene report") args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) if args.command == "dupes": dupes = find_duplicates() print(json.dumps(dupes, indent=2, ensure_ascii=False)) print(f"\n{len(dupes)} duplicate groups found.", file=sys.stderr) elif args.command == "stale": stale = find_stale() print(json.dumps(stale, indent=2, ensure_ascii=False)) print(f"\n{len(stale)} stale items found.", file=sys.stderr) elif args.command == "orphans": orph = find_orphans() print(json.dumps(orph, indent=2, ensure_ascii=False)) elif args.command == "stats": stats = gather_stats() print(json.dumps(stats, indent=2, ensure_ascii=False)) elif args.command == "archive": days = int(args.older_than.rstrip("d")) result = archive_old_notes(older_than_days=days, execute=args.execute) print(json.dumps(result, indent=2, ensure_ascii=False)) if not args.execute and result["count"] > 0: print(f"\nDry run: {result['count']} files would be archived. Use --execute to proceed.", file=sys.stderr) elif args.command == "report": report, has_critical = generate_report() print(report) if has_critical: sys.exit(1) if __name__ == "__main__": main()