MessageEvent wraps source in nested SessionSource — fields at the top level don't exist, causing resolve_peer() to always fall through to the fallback_peer. This meant every Discord user got a raw snowflake peer created instead of the mapped name.
454 lines
16 KiB
Python
454 lines
16 KiB
Python
"""
|
|
hermes-identity-plugin (identity)
|
|
|
|
Resolves platform-specific user IDs and kanban workers to stable Honcho
|
|
peer names. NO modifications to the Hermes repo required — the plugin
|
|
monkey-patches Honcho's session initialization at plugin load time.
|
|
|
|
Config: /opt/data/identity-config.json (persistent volume)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextvars
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
from typing import Any, Callable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CONFIG_PATH = Path("/opt/data/identity-config.json")
|
|
|
|
# Context var to carry resolved peer name from pre_gateway_dispatch
|
|
# to Honcho's _do_session_init without modifying event.user_id.
|
|
_resolved_peer_var: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
|
"identity_resolved_peer", default=None
|
|
)
|
|
|
|
# ── Config ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _default_config() -> dict[str, Any]:
|
|
return {
|
|
"mappings": [],
|
|
"boards": {},
|
|
"fallback_peer": "user",
|
|
"enforce_context_peer": True,
|
|
}
|
|
|
|
|
|
def load_config() -> dict[str, Any]:
|
|
if not CONFIG_PATH.exists():
|
|
return _default_config()
|
|
try:
|
|
return json.loads(CONFIG_PATH.read_text())
|
|
except (json.JSONDecodeError, OSError) as exc:
|
|
logger.warning("identity: config parse error: %s", exc)
|
|
return _default_config()
|
|
|
|
|
|
def save_config(cfg: dict[str, Any]) -> None:
|
|
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
CONFIG_PATH.write_text(json.dumps(cfg, indent=2))
|
|
logger.info("identity: config saved to %s", CONFIG_PATH)
|
|
|
|
|
|
# ── Peer resolution ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def resolve_peer(platform: str, platform_id: str) -> str | None:
|
|
"""Resolve a platform+ID to a canonical peer name from config."""
|
|
cfg = load_config()
|
|
for m in cfg.get("mappings", []):
|
|
if m.get("platform") == platform and str(m.get("id", "")) == str(platform_id):
|
|
return m["peer"]
|
|
return cfg.get("fallback_peer")
|
|
|
|
|
|
def resolve_board_peer(board_slug: str | None) -> str | None:
|
|
"""Resolve a kanban board slug to a peer name."""
|
|
if not board_slug:
|
|
return None
|
|
cfg = load_config()
|
|
return cfg.get("boards", {}).get(board_slug)
|
|
|
|
|
|
def get_peer_name() -> str | None:
|
|
"""Resolve peer name for the current process context.
|
|
|
|
Priority:
|
|
1. HERMES_HONCHO_PEER_NAME env var (explicit override or manual)
|
|
2. Kanban worker: task body context_peer → board config → fallback
|
|
3. None (Honcho behaves as before)
|
|
"""
|
|
explicit = os.environ.get("HERMES_HONCHO_PEER_NAME")
|
|
if explicit:
|
|
return explicit
|
|
|
|
task_id = os.environ.get("HERMES_KANBAN_TASK")
|
|
board = os.environ.get("HERMES_KANBAN_BOARD")
|
|
db_path = os.environ.get("HERMES_KANBAN_DB")
|
|
if task_id and db_path:
|
|
return _resolve_kanban_peer(task_id, board, db_path)
|
|
|
|
cfg = load_config()
|
|
return cfg.get("fallback_peer")
|
|
|
|
|
|
def _resolve_kanban_peer(task_id: str, board: str | None, db_path: str) -> str | None:
|
|
"""Read kanban task body for context_peer, fallback to board config."""
|
|
try:
|
|
db = Path(db_path)
|
|
if not db.exists():
|
|
return None
|
|
conn = sqlite3.connect(str(db))
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT body FROM tasks WHERE id = ?", (task_id,)
|
|
).fetchone()
|
|
if not row:
|
|
return resolve_board_peer(board)
|
|
body = row["body"] or ""
|
|
peer = _extract_context_peer(body)
|
|
if peer:
|
|
return peer
|
|
return resolve_board_peer(board)
|
|
finally:
|
|
conn.close()
|
|
except Exception as exc:
|
|
logger.debug("identity: kanban peer resolution error: %s", exc)
|
|
return None
|
|
|
|
|
|
_CONTEXT_PEER_RE = re.compile(r"```metadata\s*\ncontext_peer:\s*(\S+)", re.MULTILINE)
|
|
_INLINE_PEER_RE = re.compile(r"context_peer:\s*(\S+)")
|
|
|
|
|
|
def _extract_context_peer(body: str) -> str | None:
|
|
m = _CONTEXT_PEER_RE.search(body)
|
|
if m:
|
|
return m.group(1)
|
|
m = _INLINE_PEER_RE.search(body)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
|
|
def enforce_context_peer(body: str | None) -> str | None:
|
|
"""Return error string if body lacks context_peer, None if OK."""
|
|
cfg = load_config()
|
|
if not cfg.get("enforce_context_peer", True):
|
|
return None
|
|
if not body or "context_peer:" not in body:
|
|
return (
|
|
"Missing required `context_peer: <name>` in task body. "
|
|
"Add a metadata block:\n"
|
|
"```metadata\ncontext_peer: thierry\n```\n"
|
|
)
|
|
peer = _extract_context_peer(body)
|
|
if not peer:
|
|
return "context_peer: found in body but could not be parsed."
|
|
return None
|
|
|
|
|
|
# ── Runtime monkey-patch of Honcho session init ─────────────────────────────
|
|
# This is the KEY mechanism: no files modified, no fork needed.
|
|
# The plugin wraps HonchoMemoryProvider._do_session_init at plugin load time
|
|
# to inject our resolved peer name before Honcho creates the session.
|
|
|
|
_original_init: Callable | None = None
|
|
|
|
|
|
def _patched_do_session_init(self, cfg, session_id: str, **kwargs):
|
|
"""Wrapper around Honcho's _do_session_init.
|
|
|
|
Priority for user_id:
|
|
1. _resolved_peer_var from pre_gateway_dispatch (gateway multi-user)
|
|
2. get_peer_name() for kanban workers
|
|
3. kwargs.get("user_id") as-is (existing Discord snowflake fallback)
|
|
"""
|
|
# Check context var first (set by _pre_gateway_dispatch for
|
|
# gateway-originated messages — peer name already resolved)
|
|
resolved = _resolved_peer_var.get()
|
|
if resolved:
|
|
logger.info(
|
|
"identity: Honcho peer '%s' from gateway dispatch (was %s)",
|
|
resolved, kwargs.get("user_id", "(none)"),
|
|
)
|
|
kwargs["user_id"] = resolved
|
|
elif not kwargs.get("user_id"):
|
|
# Kanban worker / CLI — no user_id set by gateway
|
|
resolved = get_peer_name()
|
|
if resolved:
|
|
logger.info(
|
|
"identity: injecting peer '%s' into Honcho session (task=%s, board=%s)",
|
|
resolved,
|
|
os.environ.get("HERMES_KANBAN_TASK", "-"),
|
|
os.environ.get("HERMES_KANBAN_BOARD", "-"),
|
|
)
|
|
kwargs["user_id"] = resolved
|
|
return _original_init(self, cfg, session_id, **kwargs) # type: ignore[misc]
|
|
|
|
|
|
def _apply_honcho_patch():
|
|
"""Apply monkey-patch to Honcho's session initialization.
|
|
|
|
Must be called after Honcho is loaded but before any session starts.
|
|
Safe to call multiple times - only patches on first call.
|
|
"""
|
|
global _original_init
|
|
|
|
if _original_init is not None:
|
|
return # already patched
|
|
|
|
try:
|
|
import plugins.memory.honcho # type: ignore[import-untyped]
|
|
# Use string-based patching to avoid import-time issues
|
|
honcho_mod = __import__(
|
|
"plugins.memory.honcho",
|
|
fromlist=["HonchoMemoryProvider"],
|
|
)
|
|
provider_cls = getattr(honcho_mod, "HonchoMemoryProvider", None)
|
|
if provider_cls is None:
|
|
logger.warning("identity: HonchoMemoryProvider not found, cannot patch")
|
|
return
|
|
|
|
orig = getattr(provider_cls, "_do_session_init", None)
|
|
if orig is None:
|
|
logger.warning("identity: HonchoMemoryProvider._do_session_init not found")
|
|
return
|
|
|
|
# Store original and replace with wrapper
|
|
_original_init = orig
|
|
|
|
@wraps(orig)
|
|
def wrapper(self, cfg, session_id: str, **kwargs):
|
|
return _patched_do_session_init(self, cfg, session_id, **kwargs)
|
|
|
|
setattr(provider_cls, "_do_session_init", wrapper)
|
|
logger.info("identity: patched HonchoMemoryProvider._do_session_init ✓")
|
|
except Exception as exc:
|
|
logger.warning("identity: failed to patch Honcho init: %s", exc)
|
|
|
|
|
|
# ── SessionDB patch (version compat: user_id kwarg) ──────────────────────
|
|
|
|
_session_db_patched = False
|
|
|
|
|
|
def _patch_session_db() -> None:
|
|
"""Monkey-patch SessionDB methods to accept (and ignore) the ``user_id``
|
|
kwarg that the built-in ``session_search`` tool passes but the upstream
|
|
backend no longer supports. Same zero-fork pattern as the Honcho patch."""
|
|
global _session_db_patched
|
|
if _session_db_patched:
|
|
return
|
|
|
|
try:
|
|
from hermes_state import SessionDB
|
|
except ImportError as exc:
|
|
logger.warning("identity: hermes_state not importable (%s), skipping SessionDB patch", exc)
|
|
return
|
|
except Exception as exc:
|
|
logger.warning("identity: failed to import SessionDB: %s", exc)
|
|
return
|
|
|
|
# search_messages
|
|
orig_search = SessionDB.search_messages
|
|
if orig_search is None:
|
|
logger.warning("identity: SessionDB.search_messages not found")
|
|
return
|
|
|
|
def _patched_search(self, *args, **kwargs):
|
|
kwargs.pop("user_id", None)
|
|
return orig_search(self, *args, **kwargs)
|
|
|
|
SessionDB.search_messages = _patched_search
|
|
|
|
# list_sessions_rich
|
|
orig_list = SessionDB.list_sessions_rich
|
|
if orig_list is None:
|
|
logger.warning("identity: SessionDB.list_sessions_rich not found")
|
|
return
|
|
|
|
def _patched_list(self, *args, **kwargs):
|
|
kwargs.pop("user_id", None)
|
|
return orig_list(self, *args, **kwargs)
|
|
|
|
SessionDB.list_sessions_rich = _patched_list
|
|
|
|
_session_db_patched = True
|
|
logger.info("identity: patched SessionDB methods ✓")
|
|
|
|
|
|
# ── Hooks ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _pre_gateway_dispatch(event: Any, gateway: Any, session_store: Any, **kw) -> dict | None:
|
|
"""Resolve peer identity and store it for Honcho session init."""
|
|
# NOTE: MessageEvent wraps source in nested SessionSource object.
|
|
# event.source.{platform, user_id} are the correct paths — NOT
|
|
# event.{platform, user_id} which don't exist on MessageEvent.
|
|
source = event.source if hasattr(event, "source") else event
|
|
platform = getattr(source, "platform", "unknown")
|
|
user_id = getattr(source, "user_id", "unknown")
|
|
resolved = resolve_peer(platform, user_id)
|
|
|
|
if resolved:
|
|
logger.debug(
|
|
"identity: gateway platform=%s user_id=%s → peer=%s",
|
|
platform, user_id, resolved,
|
|
)
|
|
_resolved_peer_var.set(resolved)
|
|
else:
|
|
logger.info("identity: unmapped gateway user platform=%s user_id=%s", platform, user_id)
|
|
return None
|
|
|
|
|
|
def _on_session_start(**kw: Any) -> None:
|
|
"""Log resolved identity at session start."""
|
|
peer = get_peer_name()
|
|
task_id = os.environ.get("HERMES_KANBAN_TASK")
|
|
if task_id:
|
|
if peer:
|
|
logger.info("identity: kanban worker task=%s peer=%s", task_id, peer)
|
|
else:
|
|
logger.info("identity: kanban worker task=%s → user-default (no mapping)", task_id)
|
|
elif peer:
|
|
logger.debug("identity: session peer=%s", peer)
|
|
|
|
|
|
# ── Slash command ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def _cmd_identity(raw_args: str) -> str:
|
|
"""Handle ``/identity`` slash command."""
|
|
args = raw_args.strip().split()
|
|
if not args:
|
|
return _show_status()
|
|
|
|
cmd = args[0]
|
|
if cmd == "status":
|
|
return _show_status()
|
|
elif cmd == "list":
|
|
return _list_mappings()
|
|
elif cmd in ("add", "set") and len(args) >= 4:
|
|
platform, pid, peer_name = args[1], args[2], args[3]
|
|
return _add_mapping(platform, pid, peer_name)
|
|
elif cmd == "rm" and len(args) >= 2:
|
|
idx = int(args[1]) if args[1].isdigit() else -1
|
|
return _remove_mapping(idx)
|
|
elif cmd == "board" and len(args) >= 3:
|
|
return _set_board(args[1], args[2])
|
|
elif cmd == "help":
|
|
return _help_text()
|
|
return _help_text()
|
|
|
|
|
|
def _show_status() -> str:
|
|
cfg = load_config()
|
|
lines = [
|
|
"**Identity Plugin**",
|
|
f"Config: `{CONFIG_PATH}`",
|
|
f"Mappings: {len(cfg.get('mappings', []))}",
|
|
f"Boards: {len(cfg.get('boards', {}))}",
|
|
f"Enforce context_peer: {cfg.get('enforce_context_peer', True)}",
|
|
f"Fallback: {cfg.get('fallback_peer', 'user')}",
|
|
f"Honcho patch: {'✓' if _original_init else '✗ not applied'}",
|
|
"",
|
|
"Current context:",
|
|
]
|
|
peer = get_peer_name()
|
|
lines.append(f" Resolved peer: **{peer or '(none — will use user-default)'}**")
|
|
task = os.environ.get("HERMES_KANBAN_TASK")
|
|
if task:
|
|
lines.append(f" Kanban task: `{task}`")
|
|
board = os.environ.get("HERMES_KANBAN_BOARD")
|
|
if board:
|
|
lines.append(f" Board: `{board}`")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _list_mappings() -> str:
|
|
cfg = load_config()
|
|
mappings = cfg.get("mappings", [])
|
|
if not mappings:
|
|
return "No mappings configured."
|
|
lines = ["**Identity mappings:**", ""]
|
|
for i, m in enumerate(mappings):
|
|
lines.append(f" [{i}] `{m.get('platform')}` `{m.get('id')}` → **{m.get('peer')}**")
|
|
boards = cfg.get("boards", {})
|
|
if boards:
|
|
lines.append("")
|
|
lines.append("**Board peers:**")
|
|
for slug, peer in boards.items():
|
|
lines.append(f" `{slug}` → **{peer}**")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _add_mapping(platform: str, pid: str, peer_name: str) -> str:
|
|
cfg = load_config()
|
|
cfg.setdefault("mappings", []).append({"platform": platform, "id": pid, "peer": peer_name})
|
|
save_config(cfg)
|
|
return f"Added: `{platform}` `{pid}` → **{peer_name}**"
|
|
|
|
|
|
def _remove_mapping(idx: int) -> str:
|
|
cfg = load_config()
|
|
mappings = cfg.get("mappings", [])
|
|
if idx < 0 or idx >= len(mappings):
|
|
return f"Invalid index {idx}. Use `list` to see indices."
|
|
removed = mappings.pop(idx)
|
|
save_config(cfg)
|
|
return f"Removed: `{removed.get('platform')}` `{removed.get('id')}` → **{removed.get('peer')}**"
|
|
|
|
|
|
def _set_board(slug: str, peer_name: str) -> str:
|
|
cfg = load_config()
|
|
cfg.setdefault("boards", {})[slug] = peer_name
|
|
save_config(cfg)
|
|
return f"Board `{slug}` → **{peer_name}**"
|
|
|
|
|
|
def _help_text() -> str:
|
|
return """**/identity** — manage identity mappings
|
|
|
|
Commands:
|
|
`status` Show config status and current context
|
|
`list` List all mappings and board peers
|
|
`add discord <id> <peer>` Add a platform→peer mapping
|
|
`rm <index>` Remove mapping by index
|
|
`board <slug> <peer>` Set a kanban board's default peer
|
|
`help` Show this help"""
|
|
|
|
|
|
# ── Plugin entry point ──────────────────────────────────────────────────────
|
|
|
|
|
|
def register(ctx: Any) -> None:
|
|
"""Register identity plugin hooks and apply Honcho session patch."""
|
|
# Apply the runtime monkey-patch to Honcho's session init
|
|
# NO files modified, NO fork needed — pure Python at import time
|
|
_apply_honcho_patch()
|
|
_patch_session_db()
|
|
|
|
# Register hooks
|
|
ctx.register_hook("pre_gateway_dispatch", _pre_gateway_dispatch)
|
|
ctx.register_hook("on_session_start", _on_session_start)
|
|
|
|
# Register slash command
|
|
ctx.register_command(
|
|
name="identity",
|
|
handler=_cmd_identity,
|
|
description="Manage identity peer mappings",
|
|
args_hint="status|list|add|rm|board|help",
|
|
)
|
|
|
|
logger.info("identity-plugin: registered ✓ (honcho patch=%s)", "applied" if _original_init else "not-applied")
|