Files
hermes-identity-plugin/__init__.py

450 lines
16 KiB
Python

"""
hermes-identity-plugin (identity)
Resolves platform-specific user IDs and kanban workers to stable Honcho
peer names. NO modifications to the Hermes repo required — the plugin
monkey-patches Honcho's session initialization at plugin load time.
Config: /opt/data/identity-config.json (persistent volume)
"""
from __future__ import annotations
import contextvars
import json
import logging
import os
import re
import sqlite3
from functools import wraps
from pathlib import Path
from typing import Any, Callable
logger = logging.getLogger(__name__)
CONFIG_PATH = Path("/opt/data/identity-config.json")
# Context var to carry resolved peer name from pre_gateway_dispatch
# to Honcho's _do_session_init without modifying event.user_id.
_resolved_peer_var: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"identity_resolved_peer", default=None
)
# ── Config ──────────────────────────────────────────────────────────────────
def _default_config() -> dict[str, Any]:
return {
"mappings": [],
"boards": {},
"fallback_peer": "user",
"enforce_context_peer": True,
}
def load_config() -> dict[str, Any]:
if not CONFIG_PATH.exists():
return _default_config()
try:
return json.loads(CONFIG_PATH.read_text())
except (json.JSONDecodeError, OSError) as exc:
logger.warning("identity: config parse error: %s", exc)
return _default_config()
def save_config(cfg: dict[str, Any]) -> None:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(cfg, indent=2))
logger.info("identity: config saved to %s", CONFIG_PATH)
# ── Peer resolution ──────────────────────────────────────────────────────────
def resolve_peer(platform: str, platform_id: str) -> str | None:
"""Resolve a platform+ID to a canonical peer name from config."""
cfg = load_config()
for m in cfg.get("mappings", []):
if m.get("platform") == platform and str(m.get("id", "")) == str(platform_id):
return m["peer"]
return cfg.get("fallback_peer")
def resolve_board_peer(board_slug: str | None) -> str | None:
"""Resolve a kanban board slug to a peer name."""
if not board_slug:
return None
cfg = load_config()
return cfg.get("boards", {}).get(board_slug)
def get_peer_name() -> str | None:
"""Resolve peer name for the current process context.
Priority:
1. HERMES_HONCHO_PEER_NAME env var (explicit override or manual)
2. Kanban worker: task body context_peer → board config → fallback
3. None (Honcho behaves as before)
"""
explicit = os.environ.get("HERMES_HONCHO_PEER_NAME")
if explicit:
return explicit
task_id = os.environ.get("HERMES_KANBAN_TASK")
board = os.environ.get("HERMES_KANBAN_BOARD")
db_path = os.environ.get("HERMES_KANBAN_DB")
if task_id and db_path:
return _resolve_kanban_peer(task_id, board, db_path)
cfg = load_config()
return cfg.get("fallback_peer")
def _resolve_kanban_peer(task_id: str, board: str | None, db_path: str) -> str | None:
"""Read kanban task body for context_peer, fallback to board config."""
try:
db = Path(db_path)
if not db.exists():
return None
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT body FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if not row:
return resolve_board_peer(board)
body = row["body"] or ""
peer = _extract_context_peer(body)
if peer:
return peer
return resolve_board_peer(board)
finally:
conn.close()
except Exception as exc:
logger.debug("identity: kanban peer resolution error: %s", exc)
return None
_CONTEXT_PEER_RE = re.compile(r"```metadata\s*\ncontext_peer:\s*(\S+)", re.MULTILINE)
_INLINE_PEER_RE = re.compile(r"context_peer:\s*(\S+)")
def _extract_context_peer(body: str) -> str | None:
m = _CONTEXT_PEER_RE.search(body)
if m:
return m.group(1)
m = _INLINE_PEER_RE.search(body)
if m:
return m.group(1)
return None
def enforce_context_peer(body: str | None) -> str | None:
"""Return error string if body lacks context_peer, None if OK."""
cfg = load_config()
if not cfg.get("enforce_context_peer", True):
return None
if not body or "context_peer:" not in body:
return (
"Missing required `context_peer: <name>` in task body. "
"Add a metadata block:\n"
"```metadata\ncontext_peer: thierry\n```\n"
)
peer = _extract_context_peer(body)
if not peer:
return "context_peer: found in body but could not be parsed."
return None
# ── Runtime monkey-patch of Honcho session init ─────────────────────────────
# This is the KEY mechanism: no files modified, no fork needed.
# The plugin wraps HonchoMemoryProvider._do_session_init at plugin load time
# to inject our resolved peer name before Honcho creates the session.
_original_init: Callable | None = None
def _patched_do_session_init(self, cfg, session_id: str, **kwargs):
"""Wrapper around Honcho's _do_session_init.
Priority for user_id:
1. _resolved_peer_var from pre_gateway_dispatch (gateway multi-user)
2. get_peer_name() for kanban workers
3. kwargs.get("user_id") as-is (existing Discord snowflake fallback)
"""
# Check context var first (set by _pre_gateway_dispatch for
# gateway-originated messages — peer name already resolved)
resolved = _resolved_peer_var.get()
if resolved:
logger.info(
"identity: Honcho peer '%s' from gateway dispatch (was %s)",
resolved, kwargs.get("user_id", "(none)"),
)
kwargs["user_id"] = resolved
elif not kwargs.get("user_id"):
# Kanban worker / CLI — no user_id set by gateway
resolved = get_peer_name()
if resolved:
logger.info(
"identity: injecting peer '%s' into Honcho session (task=%s, board=%s)",
resolved,
os.environ.get("HERMES_KANBAN_TASK", "-"),
os.environ.get("HERMES_KANBAN_BOARD", "-"),
)
kwargs["user_id"] = resolved
return _original_init(self, cfg, session_id, **kwargs) # type: ignore[misc]
def _apply_honcho_patch():
"""Apply monkey-patch to Honcho's session initialization.
Must be called after Honcho is loaded but before any session starts.
Safe to call multiple times - only patches on first call.
"""
global _original_init
if _original_init is not None:
return # already patched
try:
import plugins.memory.honcho # type: ignore[import-untyped]
# Use string-based patching to avoid import-time issues
honcho_mod = __import__(
"plugins.memory.honcho",
fromlist=["HonchoMemoryProvider"],
)
provider_cls = getattr(honcho_mod, "HonchoMemoryProvider", None)
if provider_cls is None:
logger.warning("identity: HonchoMemoryProvider not found, cannot patch")
return
orig = getattr(provider_cls, "_do_session_init", None)
if orig is None:
logger.warning("identity: HonchoMemoryProvider._do_session_init not found")
return
# Store original and replace with wrapper
_original_init = orig
@wraps(orig)
def wrapper(self, cfg, session_id: str, **kwargs):
return _patched_do_session_init(self, cfg, session_id, **kwargs)
setattr(provider_cls, "_do_session_init", wrapper)
logger.info("identity: patched HonchoMemoryProvider._do_session_init ✓")
except Exception as exc:
logger.warning("identity: failed to patch Honcho init: %s", exc)
# ── SessionDB patch (version compat: user_id kwarg) ──────────────────────
_session_db_patched = False
def _patch_session_db() -> None:
"""Monkey-patch SessionDB methods to accept (and ignore) the ``user_id``
kwarg that the built-in ``session_search`` tool passes but the upstream
backend no longer supports. Same zero-fork pattern as the Honcho patch."""
global _session_db_patched
if _session_db_patched:
return
try:
from hermes_state import SessionDB
except ImportError as exc:
logger.warning("identity: hermes_state not importable (%s), skipping SessionDB patch", exc)
return
except Exception as exc:
logger.warning("identity: failed to import SessionDB: %s", exc)
return
# search_messages
orig_search = SessionDB.search_messages
if orig_search is None:
logger.warning("identity: SessionDB.search_messages not found")
return
def _patched_search(self, *args, **kwargs):
kwargs.pop("user_id", None)
return orig_search(self, *args, **kwargs)
SessionDB.search_messages = _patched_search
# list_sessions_rich
orig_list = SessionDB.list_sessions_rich
if orig_list is None:
logger.warning("identity: SessionDB.list_sessions_rich not found")
return
def _patched_list(self, *args, **kwargs):
kwargs.pop("user_id", None)
return orig_list(self, *args, **kwargs)
SessionDB.list_sessions_rich = _patched_list
_session_db_patched = True
logger.info("identity: patched SessionDB methods ✓")
# ── Hooks ───────────────────────────────────────────────────────────────────
def _pre_gateway_dispatch(event: Any, gateway: Any, session_store: Any, **kw) -> dict | None:
"""Resolve peer identity and store it for Honcho session init."""
platform = getattr(event, "platform", "unknown")
user_id = getattr(event, "user_id", "unknown")
resolved = resolve_peer(platform, user_id)
if resolved:
logger.debug(
"identity: gateway platform=%s user_id=%s → peer=%s",
platform, user_id, resolved,
)
_resolved_peer_var.set(resolved)
else:
logger.info("identity: unmapped gateway user platform=%s user_id=%s", platform, user_id)
return None
def _on_session_start(**kw: Any) -> None:
"""Log resolved identity at session start."""
peer = get_peer_name()
task_id = os.environ.get("HERMES_KANBAN_TASK")
if task_id:
if peer:
logger.info("identity: kanban worker task=%s peer=%s", task_id, peer)
else:
logger.info("identity: kanban worker task=%s → user-default (no mapping)", task_id)
elif peer:
logger.debug("identity: session peer=%s", peer)
# ── Slash command ───────────────────────────────────────────────────────────
def _cmd_identity(raw_args: str) -> str:
"""Handle ``/identity`` slash command."""
args = raw_args.strip().split()
if not args:
return _show_status()
cmd = args[0]
if cmd == "status":
return _show_status()
elif cmd == "list":
return _list_mappings()
elif cmd in ("add", "set") and len(args) >= 4:
platform, pid, peer_name = args[1], args[2], args[3]
return _add_mapping(platform, pid, peer_name)
elif cmd == "rm" and len(args) >= 2:
idx = int(args[1]) if args[1].isdigit() else -1
return _remove_mapping(idx)
elif cmd == "board" and len(args) >= 3:
return _set_board(args[1], args[2])
elif cmd == "help":
return _help_text()
return _help_text()
def _show_status() -> str:
cfg = load_config()
lines = [
"**Identity Plugin**",
f"Config: `{CONFIG_PATH}`",
f"Mappings: {len(cfg.get('mappings', []))}",
f"Boards: {len(cfg.get('boards', {}))}",
f"Enforce context_peer: {cfg.get('enforce_context_peer', True)}",
f"Fallback: {cfg.get('fallback_peer', 'user')}",
f"Honcho patch: {'' if _original_init else '✗ not applied'}",
"",
"Current context:",
]
peer = get_peer_name()
lines.append(f" Resolved peer: **{peer or '(none — will use user-default)'}**")
task = os.environ.get("HERMES_KANBAN_TASK")
if task:
lines.append(f" Kanban task: `{task}`")
board = os.environ.get("HERMES_KANBAN_BOARD")
if board:
lines.append(f" Board: `{board}`")
return "\n".join(lines)
def _list_mappings() -> str:
cfg = load_config()
mappings = cfg.get("mappings", [])
if not mappings:
return "No mappings configured."
lines = ["**Identity mappings:**", ""]
for i, m in enumerate(mappings):
lines.append(f" [{i}] `{m.get('platform')}` `{m.get('id')}` → **{m.get('peer')}**")
boards = cfg.get("boards", {})
if boards:
lines.append("")
lines.append("**Board peers:**")
for slug, peer in boards.items():
lines.append(f" `{slug}` → **{peer}**")
return "\n".join(lines)
def _add_mapping(platform: str, pid: str, peer_name: str) -> str:
cfg = load_config()
cfg.setdefault("mappings", []).append({"platform": platform, "id": pid, "peer": peer_name})
save_config(cfg)
return f"Added: `{platform}` `{pid}` → **{peer_name}**"
def _remove_mapping(idx: int) -> str:
cfg = load_config()
mappings = cfg.get("mappings", [])
if idx < 0 or idx >= len(mappings):
return f"Invalid index {idx}. Use `list` to see indices."
removed = mappings.pop(idx)
save_config(cfg)
return f"Removed: `{removed.get('platform')}` `{removed.get('id')}` → **{removed.get('peer')}**"
def _set_board(slug: str, peer_name: str) -> str:
cfg = load_config()
cfg.setdefault("boards", {})[slug] = peer_name
save_config(cfg)
return f"Board `{slug}` → **{peer_name}**"
def _help_text() -> str:
return """**/identity** — manage identity mappings
Commands:
`status` Show config status and current context
`list` List all mappings and board peers
`add discord <id> <peer>` Add a platform→peer mapping
`rm <index>` Remove mapping by index
`board <slug> <peer>` Set a kanban board's default peer
`help` Show this help"""
# ── Plugin entry point ──────────────────────────────────────────────────────
def register(ctx: Any) -> None:
"""Register identity plugin hooks and apply Honcho session patch."""
# Apply the runtime monkey-patch to Honcho's session init
# NO files modified, NO fork needed — pure Python at import time
_apply_honcho_patch()
_patch_session_db()
# Register hooks
ctx.register_hook("pre_gateway_dispatch", _pre_gateway_dispatch)
ctx.register_hook("on_session_start", _on_session_start)
# Register slash command
ctx.register_command(
name="identity",
handler=_cmd_identity,
description="Manage identity peer mappings",
args_hint="status|list|add|rm|board|help",
)
logger.info("identity-plugin: registered ✓ (honcho patch=%s)", "applied" if _original_init else "not-applied")