""" hermes-identity-plugin (identity) Resolves platform-specific user IDs and kanban workers to stable Honcho peer names. NO modifications to the Hermes repo required — the plugin monkey-patches Honcho's session initialization at plugin load time. Config: /opt/data/identity-config.json (persistent volume) """ from __future__ import annotations import contextvars import json import logging import os import re import sqlite3 from functools import wraps from pathlib import Path from typing import Any, Callable logger = logging.getLogger(__name__) CONFIG_PATH = Path("/opt/data/identity-config.json") # Context var to carry resolved peer name from pre_gateway_dispatch # to Honcho's _do_session_init without modifying event.user_id. _resolved_peer_var: contextvars.ContextVar[str | None] = contextvars.ContextVar( "identity_resolved_peer", default=None ) # ── Config ────────────────────────────────────────────────────────────────── def _default_config() -> dict[str, Any]: return { "mappings": [], "boards": {}, "fallback_peer": "user", "enforce_context_peer": True, } def load_config() -> dict[str, Any]: if not CONFIG_PATH.exists(): return _default_config() try: return json.loads(CONFIG_PATH.read_text()) except (json.JSONDecodeError, OSError) as exc: logger.warning("identity: config parse error: %s", exc) return _default_config() def save_config(cfg: dict[str, Any]) -> None: CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) CONFIG_PATH.write_text(json.dumps(cfg, indent=2)) logger.info("identity: config saved to %s", CONFIG_PATH) # ── Peer resolution ────────────────────────────────────────────────────────── def resolve_peer(platform: str, platform_id: str) -> str | None: """Resolve a platform+ID to a canonical peer name from config.""" cfg = load_config() for m in cfg.get("mappings", []): if m.get("platform") == platform and str(m.get("id", "")) == str(platform_id): return m["peer"] return cfg.get("fallback_peer") def resolve_board_peer(board_slug: str | None) -> str | None: """Resolve a kanban board slug to a peer name.""" if not board_slug: return None cfg = load_config() return cfg.get("boards", {}).get(board_slug) def get_peer_name() -> str | None: """Resolve peer name for the current process context. Priority: 1. HERMES_HONCHO_PEER_NAME env var (explicit override or manual) 2. Kanban worker: task body context_peer → board config → fallback 3. None (Honcho behaves as before) """ explicit = os.environ.get("HERMES_HONCHO_PEER_NAME") if explicit: return explicit task_id = os.environ.get("HERMES_KANBAN_TASK") board = os.environ.get("HERMES_KANBAN_BOARD") db_path = os.environ.get("HERMES_KANBAN_DB") if task_id and db_path: return _resolve_kanban_peer(task_id, board, db_path) cfg = load_config() return cfg.get("fallback_peer") def _resolve_kanban_peer(task_id: str, board: str | None, db_path: str) -> str | None: """Read kanban task body for context_peer, fallback to board config.""" try: db = Path(db_path) if not db.exists(): return None conn = sqlite3.connect(str(db)) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT body FROM tasks WHERE id = ?", (task_id,) ).fetchone() if not row: return resolve_board_peer(board) body = row["body"] or "" peer = _extract_context_peer(body) if peer: return peer return resolve_board_peer(board) finally: conn.close() except Exception as exc: logger.debug("identity: kanban peer resolution error: %s", exc) return None _CONTEXT_PEER_RE = re.compile(r"```metadata\s*\ncontext_peer:\s*(\S+)", re.MULTILINE) _INLINE_PEER_RE = re.compile(r"context_peer:\s*(\S+)") def _extract_context_peer(body: str) -> str | None: m = _CONTEXT_PEER_RE.search(body) if m: return m.group(1) m = _INLINE_PEER_RE.search(body) if m: return m.group(1) return None def enforce_context_peer(body: str | None) -> str | None: """Return error string if body lacks context_peer, None if OK.""" cfg = load_config() if not cfg.get("enforce_context_peer", True): return None if not body or "context_peer:" not in body: return ( "Missing required `context_peer: ` in task body. " "Add a metadata block:\n" "```metadata\ncontext_peer: thierry\n```\n" ) peer = _extract_context_peer(body) if not peer: return "context_peer: found in body but could not be parsed." return None # ── Runtime monkey-patch of Honcho session init ───────────────────────────── # This is the KEY mechanism: no files modified, no fork needed. # The plugin wraps HonchoMemoryProvider._do_session_init at plugin load time # to inject our resolved peer name before Honcho creates the session. _original_init: Callable | None = None def _patched_do_session_init(self, cfg, session_id: str, **kwargs): """Wrapper around Honcho's _do_session_init. Priority for user_id: 1. _resolved_peer_var from pre_gateway_dispatch (gateway multi-user) 2. get_peer_name() for kanban workers 3. kwargs.get("user_id") as-is (existing Discord snowflake fallback) """ # Check context var first (set by _pre_gateway_dispatch for # gateway-originated messages — peer name already resolved) resolved = _resolved_peer_var.get() if resolved: logger.info( "identity: Honcho peer '%s' from gateway dispatch (was %s)", resolved, kwargs.get("user_id", "(none)"), ) kwargs["user_id"] = resolved elif not kwargs.get("user_id"): # Kanban worker / CLI — no user_id set by gateway resolved = get_peer_name() if resolved: logger.info( "identity: injecting peer '%s' into Honcho session (task=%s, board=%s)", resolved, os.environ.get("HERMES_KANBAN_TASK", "-"), os.environ.get("HERMES_KANBAN_BOARD", "-"), ) kwargs["user_id"] = resolved return _original_init(self, cfg, session_id, **kwargs) # type: ignore[misc] def _apply_honcho_patch(): """Apply monkey-patch to Honcho's session initialization. Must be called after Honcho is loaded but before any session starts. Safe to call multiple times - only patches on first call. """ global _original_init if _original_init is not None: return # already patched try: import plugins.memory.honcho # type: ignore[import-untyped] # Use string-based patching to avoid import-time issues honcho_mod = __import__( "plugins.memory.honcho", fromlist=["HonchoMemoryProvider"], ) provider_cls = getattr(honcho_mod, "HonchoMemoryProvider", None) if provider_cls is None: logger.warning("identity: HonchoMemoryProvider not found, cannot patch") return orig = getattr(provider_cls, "_do_session_init", None) if orig is None: logger.warning("identity: HonchoMemoryProvider._do_session_init not found") return # Store original and replace with wrapper _original_init = orig @wraps(orig) def wrapper(self, cfg, session_id: str, **kwargs): return _patched_do_session_init(self, cfg, session_id, **kwargs) setattr(provider_cls, "_do_session_init", wrapper) logger.info("identity: patched HonchoMemoryProvider._do_session_init ✓") except Exception as exc: logger.warning("identity: failed to patch Honcho init: %s", exc) # ── SessionDB patch (version compat: user_id kwarg) ────────────────────── _session_db_patched = False def _patch_session_db() -> None: """Monkey-patch SessionDB methods to accept (and ignore) the ``user_id`` kwarg that the built-in ``session_search`` tool passes but the upstream backend no longer supports. Same zero-fork pattern as the Honcho patch.""" global _session_db_patched if _session_db_patched: return try: from hermes_state import SessionDB except ImportError as exc: logger.warning("identity: hermes_state not importable (%s), skipping SessionDB patch", exc) return except Exception as exc: logger.warning("identity: failed to import SessionDB: %s", exc) return # search_messages orig_search = SessionDB.search_messages if orig_search is None: logger.warning("identity: SessionDB.search_messages not found") return def _patched_search(self, *args, **kwargs): kwargs.pop("user_id", None) return orig_search(self, *args, **kwargs) SessionDB.search_messages = _patched_search # list_sessions_rich orig_list = SessionDB.list_sessions_rich if orig_list is None: logger.warning("identity: SessionDB.list_sessions_rich not found") return def _patched_list(self, *args, **kwargs): kwargs.pop("user_id", None) return orig_list(self, *args, **kwargs) SessionDB.list_sessions_rich = _patched_list _session_db_patched = True logger.info("identity: patched SessionDB methods ✓") # ── Hooks ─────────────────────────────────────────────────────────────────── def _pre_gateway_dispatch(event: Any, gateway: Any, session_store: Any, **kw) -> dict | None: """Resolve peer identity and store it for Honcho session init.""" platform = getattr(event, "platform", "unknown") user_id = getattr(event, "user_id", "unknown") resolved = resolve_peer(platform, user_id) if resolved: logger.debug( "identity: gateway platform=%s user_id=%s → peer=%s", platform, user_id, resolved, ) _resolved_peer_var.set(resolved) else: logger.info("identity: unmapped gateway user platform=%s user_id=%s", platform, user_id) return None def _on_session_start(**kw: Any) -> None: """Log resolved identity at session start.""" peer = get_peer_name() task_id = os.environ.get("HERMES_KANBAN_TASK") if task_id: if peer: logger.info("identity: kanban worker task=%s peer=%s", task_id, peer) else: logger.info("identity: kanban worker task=%s → user-default (no mapping)", task_id) elif peer: logger.debug("identity: session peer=%s", peer) # ── Slash command ─────────────────────────────────────────────────────────── def _cmd_identity(raw_args: str) -> str: """Handle ``/identity`` slash command.""" args = raw_args.strip().split() if not args: return _show_status() cmd = args[0] if cmd == "status": return _show_status() elif cmd == "list": return _list_mappings() elif cmd in ("add", "set") and len(args) >= 4: platform, pid, peer_name = args[1], args[2], args[3] return _add_mapping(platform, pid, peer_name) elif cmd == "rm" and len(args) >= 2: idx = int(args[1]) if args[1].isdigit() else -1 return _remove_mapping(idx) elif cmd == "board" and len(args) >= 3: return _set_board(args[1], args[2]) elif cmd == "help": return _help_text() return _help_text() def _show_status() -> str: cfg = load_config() lines = [ "**Identity Plugin**", f"Config: `{CONFIG_PATH}`", f"Mappings: {len(cfg.get('mappings', []))}", f"Boards: {len(cfg.get('boards', {}))}", f"Enforce context_peer: {cfg.get('enforce_context_peer', True)}", f"Fallback: {cfg.get('fallback_peer', 'user')}", f"Honcho patch: {'✓' if _original_init else '✗ not applied'}", "", "Current context:", ] peer = get_peer_name() lines.append(f" Resolved peer: **{peer or '(none — will use user-default)'}**") task = os.environ.get("HERMES_KANBAN_TASK") if task: lines.append(f" Kanban task: `{task}`") board = os.environ.get("HERMES_KANBAN_BOARD") if board: lines.append(f" Board: `{board}`") return "\n".join(lines) def _list_mappings() -> str: cfg = load_config() mappings = cfg.get("mappings", []) if not mappings: return "No mappings configured." lines = ["**Identity mappings:**", ""] for i, m in enumerate(mappings): lines.append(f" [{i}] `{m.get('platform')}` `{m.get('id')}` → **{m.get('peer')}**") boards = cfg.get("boards", {}) if boards: lines.append("") lines.append("**Board peers:**") for slug, peer in boards.items(): lines.append(f" `{slug}` → **{peer}**") return "\n".join(lines) def _add_mapping(platform: str, pid: str, peer_name: str) -> str: cfg = load_config() cfg.setdefault("mappings", []).append({"platform": platform, "id": pid, "peer": peer_name}) save_config(cfg) return f"Added: `{platform}` `{pid}` → **{peer_name}**" def _remove_mapping(idx: int) -> str: cfg = load_config() mappings = cfg.get("mappings", []) if idx < 0 or idx >= len(mappings): return f"Invalid index {idx}. Use `list` to see indices." removed = mappings.pop(idx) save_config(cfg) return f"Removed: `{removed.get('platform')}` `{removed.get('id')}` → **{removed.get('peer')}**" def _set_board(slug: str, peer_name: str) -> str: cfg = load_config() cfg.setdefault("boards", {})[slug] = peer_name save_config(cfg) return f"Board `{slug}` → **{peer_name}**" def _help_text() -> str: return """**/identity** — manage identity mappings Commands: `status` Show config status and current context `list` List all mappings and board peers `add discord ` Add a platform→peer mapping `rm ` Remove mapping by index `board ` Set a kanban board's default peer `help` Show this help""" # ── Plugin entry point ────────────────────────────────────────────────────── def register(ctx: Any) -> None: """Register identity plugin hooks and apply Honcho session patch.""" # Apply the runtime monkey-patch to Honcho's session init # NO files modified, NO fork needed — pure Python at import time _apply_honcho_patch() _patch_session_db() # Register hooks ctx.register_hook("pre_gateway_dispatch", _pre_gateway_dispatch) ctx.register_hook("on_session_start", _on_session_start) # Register slash command ctx.register_command( name="identity", handler=_cmd_identity, description="Manage identity peer mappings", args_hint="status|list|add|rm|board|help", ) logger.info("identity-plugin: registered ✓ (honcho patch=%s)", "applied" if _original_init else "not-applied")