From 94f2a10a8b316b04eb97f56104ce63a50b486100 Mon Sep 17 00:00:00 2001 From: Hermes Date: Sun, 24 May 2026 19:40:56 -0400 Subject: [PATCH] initial: multi-memory provider reference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Saved from the fork's patches/multi-memory branch. Contains the refactored agent/memory_manager.py that supports N memory providers simultaneously. Not active — stored for future reference. - agent_memory_manager_patched.py (the refactored version) - agent_memory_manager_original.py (upstream version for comparison) - plugin.yaml + README --- README.md | 38 +- agent_memory_manager_original.py | 601 +++++++++++++++++++++++++++++++ agent_memory_manager_patched.py | 547 ++++++++++++++++++++++++++++ plugin.yaml | 8 + 4 files changed, 1192 insertions(+), 2 deletions(-) create mode 100644 agent_memory_manager_original.py create mode 100644 agent_memory_manager_patched.py create mode 100644 plugin.yaml diff --git a/README.md b/README.md index e0ef39d..3917915 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,37 @@ -# hermes-multimemory-plugin +# Hermes Multi-Memory Plugin -Hermes multi-memory provider plugin — allows running multiple Honcho instances or alternate memory backends simultaneously. Stored for reference; not currently active. \ No newline at end of file +Reference/save of the multi-memory provider patch from the fork. + +**NOT currently installed or active.** The fork's `patches/multi-memory` +branch refactors `agent/memory_manager.py` to support running multiple +memory providers (Honcho, Mem0, SuperMemory, RetainDB, etc.) simultaneously +instead of only one at a time. + +## Files + +- `agent_memory_manager_patched.py` — The refactored MemoryManager from the + `patches/multi-memory` branch. Handles registration, prefetch, sync, tool + routing, and lifecycle hooks for N providers. +- `agent_memory_manager_original.py` — The original upstream + `agent/memory_manager.py` for comparison. + +## Why saved but not active + +This patch fundamentally changes the MemoryManager class in +`agent/memory_manager.py`. It cannot be monkey-patched as a plugin because: + +1. The class is imported and used at module scope in multiple places before + any plugin's `register()` function runs. +2. The refactor is structural (adding provider orchestration, changing method + signatures) — not a simple constant override. +3. Hermes already has a working single-provider system (Honcho). + +The code is saved here for reference if the need for multi-memory support +ever arises. + +## Original patch context + +- Branch: `patches/multi-memory` in `gortium/hermes-agent` +- Primary file changed: `agent/memory_manager.py` (+54 lines) +- Kept Hermes's existing single-provider behavior as the default while + providing the orchestration layer for multi-provider setups diff --git a/agent_memory_manager_original.py b/agent_memory_manager_original.py new file mode 100644 index 0000000..b94e2b0 --- /dev/null +++ b/agent_memory_manager_original.py @@ -0,0 +1,601 @@ +"""MemoryManager — orchestrates the built-in memory provider plus one or +more external plugin memory providers. + +Single integration point in run_agent.py. Replaces scattered per-backend +code with one manager that delegates to registered providers. + +The BuiltinMemoryProvider is always registered first and cannot be removed. +One or more external (non-builtin) providers can be registered and will +run simultaneously. Tool name collisions are detected and the duplicate +is skipped with a warning. + +Usage in run_agent.py: + self._memory_manager = MemoryManager() + self._memory_manager.add_provider(BuiltinMemoryProvider(...)) + # One or more external providers: + self._memory_manager.add_provider(plugin_provider_a) + self._memory_manager.add_provider(plugin_provider_b) + + # System prompt + prompt_parts.append(self._memory_manager.build_system_prompt()) + + # Pre-turn + context = self._memory_manager.prefetch_all(user_message) + + # Post-turn + self._memory_manager.sync_all(user_msg, assistant_response) + self._memory_manager.queue_prefetch_all(user_msg) +""" + +from __future__ import annotations + +import logging +import re +import inspect +from typing import Any, Dict, List, Optional + +from agent.memory_provider import MemoryProvider +from tools.registry import tool_error + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Context fencing helpers +# --------------------------------------------------------------------------- + +_FENCE_TAG_RE = re.compile(r'', re.IGNORECASE) +_INTERNAL_CONTEXT_RE = re.compile( + r'<\s*memory-context\s*>[\s\S]*?', + re.IGNORECASE, +) +_INTERNAL_NOTE_RE = re.compile( + r'\[System note:\s*The following is recalled memory context,\s*NOT new user input\.\s*Treat as (?:informational background data|authoritative reference data[^\]]*)\.\]\s*', + re.IGNORECASE, +) + + +def sanitize_context(text: str) -> str: + """Strip fence tags, injected context blocks, and system notes from provider output.""" + text = _INTERNAL_CONTEXT_RE.sub('', text) + text = _INTERNAL_NOTE_RE.sub('', text) + text = _FENCE_TAG_RE.sub('', text) + return text + + +class StreamingContextScrubber: + """Stateful scrubber for streaming text that may contain split memory-context spans. + + The one-shot ``sanitize_context`` regex cannot survive chunk boundaries: + a ```` opened in one delta and closed in a later delta + leaks its payload to the UI because the non-greedy block regex needs + both tags in one string. This scrubber runs a small state machine + across deltas, holding back partial-tag tails and discarding + everything inside a span (including the system-note line). + + Usage:: + + scrubber = StreamingContextScrubber() + for delta in stream: + visible = scrubber.feed(delta) + if visible: + emit(visible) + trailing = scrubber.flush() # at end of stream + if trailing: + emit(trailing) + + The scrubber is re-entrant per agent instance. Callers building new + top-level responses (new turn) should create a fresh scrubber or call + ``reset()``. + """ + + _OPEN_TAG = "" + _CLOSE_TAG = "" + + def __init__(self) -> None: + self._in_span: bool = False + self._buf: str = "" + self._at_block_boundary: bool = True + + def reset(self) -> None: + self._in_span = False + self._buf = "" + self._at_block_boundary = True + + def feed(self, text: str) -> str: + """Return the visible portion of ``text`` after scrubbing. + + Any trailing fragment that could be the start of an open/close tag + is held back in the internal buffer and surfaced on the next + ``feed()`` call or discarded/emitted by ``flush()``. + """ + if not text: + return "" + buf = self._buf + text + self._buf = "" + out: list[str] = [] + + while buf: + if self._in_span: + idx = buf.lower().find(self._CLOSE_TAG) + if idx == -1: + # Hold back a potential partial close tag; drop the rest + held = self._max_partial_suffix(buf, self._CLOSE_TAG) + self._buf = buf[-held:] if held else "" + return "".join(out) + # Found close — skip span content + tag, continue + buf = buf[idx + len(self._CLOSE_TAG):] + self._in_span = False + else: + idx = self._find_boundary_open_tag(buf) + if idx == -1: + # No open tag — hold back a potential partial open tag + held = ( + self._max_pending_open_suffix(buf) + or self._max_partial_suffix(buf, self._OPEN_TAG) + ) + if held: + self._append_visible(out, buf[:-held]) + self._buf = buf[-held:] + else: + self._append_visible(out, buf) + return "".join(out) + # Emit text before the tag, enter span + if idx > 0: + self._append_visible(out, buf[:idx]) + buf = buf[idx + len(self._OPEN_TAG):] + self._in_span = True + + return "".join(out) + + def flush(self) -> str: + """Emit any held-back buffer at end-of-stream. + + If we're still inside an unterminated span the remaining content is + discarded (safer: leaking partial memory context is worse than a + truncated answer). Otherwise the held-back partial-tag tail is + emitted verbatim (it turned out not to be a real tag). + """ + if self._in_span: + self._buf = "" + self._in_span = False + return "" + tail = self._buf + self._buf = "" + return tail + + @staticmethod + def _max_partial_suffix(buf: str, tag: str) -> int: + """Return the length of the longest buf-suffix that is a tag-prefix. + + Case-insensitive. Returns 0 if no suffix could start the tag. + """ + tag_lower = tag.lower() + buf_lower = buf.lower() + max_check = min(len(buf_lower), len(tag_lower) - 1) + for i in range(max_check, 0, -1): + if tag_lower.startswith(buf_lower[-i:]): + return i + return 0 + + def _find_boundary_open_tag(self, buf: str) -> int: + """Find an opening fence only when it starts a block-like span.""" + buf_lower = buf.lower() + search_start = 0 + while True: + idx = buf_lower.find(self._OPEN_TAG, search_start) + if idx == -1: + return -1 + if self._is_block_boundary(buf, idx) and self._has_block_opener_suffix(buf, idx): + return idx + search_start = idx + 1 + + def _max_pending_open_suffix(self, buf: str) -> int: + """Hold a complete boundary tag until the following char confirms it.""" + if not buf.lower().endswith(self._OPEN_TAG): + return 0 + idx = len(buf) - len(self._OPEN_TAG) + if not self._is_block_boundary(buf, idx): + return 0 + return len(self._OPEN_TAG) + + def _has_block_opener_suffix(self, buf: str, idx: int) -> bool: + after_idx = idx + len(self._OPEN_TAG) + if after_idx >= len(buf): + return False + return buf[after_idx] in "\r\n" + + def _is_block_boundary(self, buf: str, idx: int) -> bool: + if idx == 0: + return self._at_block_boundary + preceding = buf[:idx] + last_newline = preceding.rfind("\n") + if last_newline == -1: + return self._at_block_boundary and preceding.strip() == "" + return preceding[last_newline + 1:].strip() == "" + + def _append_visible(self, out: list[str], text: str) -> None: + if not text: + return + out.append(text) + self._update_block_boundary(text) + + def _update_block_boundary(self, text: str) -> None: + last_newline = text.rfind("\n") + if last_newline != -1: + self._at_block_boundary = text[last_newline + 1:].strip() == "" + else: + self._at_block_boundary = self._at_block_boundary and text.strip() == "" + + +def build_memory_context_block(raw_context: str) -> str: + """Wrap prefetched memory in a fenced block with system note.""" + if not raw_context or not raw_context.strip(): + return "" + clean = sanitize_context(raw_context) + if clean != raw_context: + logger.warning("memory provider returned pre-wrapped context; stripped") + return ( + "\n" + "[System note: The following is recalled memory context, " + "NOT new user input. Treat as authoritative reference data — " + "this is the agent's persistent memory and should inform all responses.]\n\n" + f"{clean}\n" + "" + ) + + +class MemoryManager: + """Orchestrates the built-in provider plus one or more external providers. + + The builtin provider is always first. Multiple non-builtin (external) + providers can run simultaneously. Failures in one provider never block + the others. + """ + + def __init__(self) -> None: + self._providers: List[MemoryProvider] = [] + self._tool_to_provider: Dict[str, MemoryProvider] = {} + + # -- Registration -------------------------------------------------------- + + def add_provider(self, provider: MemoryProvider) -> None: + """Register a memory provider. + + Built-in provider (name ``"builtin"``) is always accepted. + Multiple external (non-builtin) providers can be registered + and will run simultaneously. + """ + self._providers.append(provider) + + # Index tool names → provider for routing + for schema in provider.get_tool_schemas(): + tool_name = schema.get("name", "") + if tool_name and tool_name not in self._tool_to_provider: + self._tool_to_provider[tool_name] = provider + elif tool_name in self._tool_to_provider: + logger.warning( + "Memory tool name conflict: '%s' already registered by %s, " + "ignoring from %s", + tool_name, + self._tool_to_provider[tool_name].name, + provider.name, + ) + + logger.info( + "Memory provider '%s' registered (%d tools)", + provider.name, + len(provider.get_tool_schemas()), + ) + + @property + def providers(self) -> List[MemoryProvider]: + """All registered providers in order.""" + return list(self._providers) + + @property + def provider_names(self) -> List[str]: + """Names of all registered providers.""" + return [p.name for p in self._providers] + + def get_provider(self, name: str) -> Optional[MemoryProvider]: + """Get a provider by name, or None if not registered.""" + for p in self._providers: + if p.name == name: + return p + return None + + # -- System prompt ------------------------------------------------------- + + def build_system_prompt(self) -> str: + """Collect system prompt blocks from all providers. + + Returns combined text, or empty string if no providers contribute. + Each non-empty block is labeled with the provider name. + """ + blocks = [] + for provider in self._providers: + try: + block = provider.system_prompt_block() + if block and block.strip(): + blocks.append(block) + except Exception as e: + logger.warning( + "Memory provider '%s' system_prompt_block() failed: %s", + provider.name, e, + ) + return "\n\n".join(blocks) + + # -- Prefetch / recall --------------------------------------------------- + + def prefetch_all(self, query: str, *, session_id: str = "") -> str: + """Collect prefetch context from all providers. + + Returns merged context text labeled by provider. Empty providers + are skipped. Failures in one provider don't block others. + """ + parts = [] + for provider in self._providers: + try: + result = provider.prefetch(query, session_id=session_id) + if result and result.strip(): + parts.append(result) + except Exception as e: + logger.debug( + "Memory provider '%s' prefetch failed (non-fatal): %s", + provider.name, e, + ) + return "\n\n".join(parts) + + def queue_prefetch_all(self, query: str, *, session_id: str = "") -> None: + """Queue background prefetch on all providers for the next turn.""" + for provider in self._providers: + try: + provider.queue_prefetch(query, session_id=session_id) + except Exception as e: + logger.debug( + "Memory provider '%s' queue_prefetch failed (non-fatal): %s", + provider.name, e, + ) + + # -- Sync ---------------------------------------------------------------- + + def sync_all(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: + """Sync a completed turn to all providers.""" + for provider in self._providers: + try: + provider.sync_turn(user_content, assistant_content, session_id=session_id) + except Exception as e: + logger.warning( + "Memory provider '%s' sync_turn failed: %s", + provider.name, e, + ) + + # -- Tools --------------------------------------------------------------- + + def get_all_tool_schemas(self) -> List[Dict[str, Any]]: + """Collect tool schemas from all providers.""" + schemas = [] + seen = set() + for provider in self._providers: + try: + for schema in provider.get_tool_schemas(): + name = schema.get("name", "") + if name and name not in seen: + schemas.append(schema) + seen.add(name) + except Exception as e: + logger.warning( + "Memory provider '%s' get_tool_schemas() failed: %s", + provider.name, e, + ) + return schemas + + def get_all_tool_names(self) -> set: + """Return set of all tool names across all providers.""" + return set(self._tool_to_provider.keys()) + + def has_tool(self, tool_name: str) -> bool: + """Check if any provider handles this tool.""" + return tool_name in self._tool_to_provider + + def handle_tool_call( + self, tool_name: str, args: Dict[str, Any], **kwargs + ) -> str: + """Route a tool call to the correct provider. + + Returns JSON string result. Raises ValueError if no provider + handles the tool. + """ + provider = self._tool_to_provider.get(tool_name) + if provider is None: + return tool_error(f"No memory provider handles tool '{tool_name}'") + try: + return provider.handle_tool_call(tool_name, args, **kwargs) + except Exception as e: + logger.error( + "Memory provider '%s' handle_tool_call(%s) failed: %s", + provider.name, tool_name, e, + ) + return tool_error(f"Memory tool '{tool_name}' failed: {e}") + + # -- Lifecycle hooks ----------------------------------------------------- + + def on_turn_start(self, turn_number: int, message: str, **kwargs) -> None: + """Notify all providers of a new turn. + + kwargs may include: remaining_tokens, model, platform, tool_count. + """ + for provider in self._providers: + try: + provider.on_turn_start(turn_number, message, **kwargs) + except Exception as e: + logger.debug( + "Memory provider '%s' on_turn_start failed: %s", + provider.name, e, + ) + + def on_session_end(self, messages: List[Dict[str, Any]]) -> None: + """Notify all providers of session end.""" + for provider in self._providers: + try: + provider.on_session_end(messages) + except Exception as e: + logger.debug( + "Memory provider '%s' on_session_end failed: %s", + provider.name, e, + ) + + def on_session_switch( + self, + new_session_id: str, + *, + parent_session_id: str = "", + reset: bool = False, + **kwargs, + ) -> None: + """Notify all providers that the agent's session_id has rotated. + + Fires on ``/resume``, ``/branch``, ``/reset``, ``/new``, and + context compression — any path that reassigns + ``AIAgent.session_id`` without tearing the provider down. + + Providers keep running; they only need to refresh cached + per-session state so subsequent writes land in the correct + session's record. See ``MemoryProvider.on_session_switch`` for + the full contract. + """ + if not new_session_id: + return + for provider in self._providers: + try: + provider.on_session_switch( + new_session_id, + parent_session_id=parent_session_id, + reset=reset, + **kwargs, + ) + except Exception as e: + logger.debug( + "Memory provider '%s' on_session_switch failed: %s", + provider.name, e, + ) + + def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: + """Notify all providers before context compression. + + Returns combined text from providers to include in the compression + summary prompt. Empty string if no provider contributes. + """ + parts = [] + for provider in self._providers: + try: + result = provider.on_pre_compress(messages) + if result and result.strip(): + parts.append(result) + except Exception as e: + logger.debug( + "Memory provider '%s' on_pre_compress failed: %s", + provider.name, e, + ) + return "\n\n".join(parts) + + @staticmethod + def _provider_memory_write_metadata_mode(provider: MemoryProvider) -> str: + """Return how to pass metadata to a provider's memory-write hook.""" + try: + signature = inspect.signature(provider.on_memory_write) + except (TypeError, ValueError): + return "keyword" + + params = list(signature.parameters.values()) + if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params): + return "keyword" + if "metadata" in signature.parameters: + return "keyword" + + accepted = [ + p for p in params + if p.kind in { + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.KEYWORD_ONLY, + } + ] + if len(accepted) >= 4: + return "positional" + return "legacy" + + def on_memory_write( + self, + action: str, + target: str, + content: str, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """Notify external providers when the built-in memory tool writes. + + Skips the builtin provider itself (it's the source of the write). + """ + for provider in self._providers: + if provider.name == "builtin": + continue + try: + metadata_mode = self._provider_memory_write_metadata_mode(provider) + if metadata_mode == "keyword": + provider.on_memory_write( + action, target, content, metadata=dict(metadata or {}) + ) + elif metadata_mode == "positional": + provider.on_memory_write(action, target, content, dict(metadata or {})) + else: + provider.on_memory_write(action, target, content) + except Exception as e: + logger.debug( + "Memory provider '%s' on_memory_write failed: %s", + provider.name, e, + ) + + def on_delegation(self, task: str, result: str, *, + child_session_id: str = "", **kwargs) -> None: + """Notify all providers that a subagent completed.""" + for provider in self._providers: + try: + provider.on_delegation( + task, result, child_session_id=child_session_id, **kwargs + ) + except Exception as e: + logger.debug( + "Memory provider '%s' on_delegation failed: %s", + provider.name, e, + ) + + def shutdown_all(self) -> None: + """Shut down all providers (reverse order for clean teardown).""" + for provider in reversed(self._providers): + try: + provider.shutdown() + except Exception as e: + logger.warning( + "Memory provider '%s' shutdown failed: %s", + provider.name, e, + ) + + def initialize_all(self, session_id: str, **kwargs) -> None: + """Initialize all providers. + + Automatically injects ``hermes_home`` into *kwargs* so that every + provider can resolve profile-scoped storage paths without importing + ``get_hermes_home()`` themselves. + """ + if "hermes_home" not in kwargs: + from hermes_constants import get_hermes_home + kwargs["hermes_home"] = str(get_hermes_home()) + for provider in self._providers: + try: + provider.initialize(session_id=session_id, **kwargs) + except Exception as e: + logger.warning( + "Memory provider '%s' initialize failed: %s", + provider.name, e, + ) diff --git a/agent_memory_manager_patched.py b/agent_memory_manager_patched.py new file mode 100644 index 0000000..268b8e2 --- /dev/null +++ b/agent_memory_manager_patched.py @@ -0,0 +1,547 @@ +"""MemoryManager — orchestrates the built-in memory provider plus one or +more external plugin memory providers. + +Single integration point in run_agent.py. Replaces scattered per-backend +code with one manager that delegates to registered providers. + +The BuiltinMemoryProvider is always registered first and cannot be removed. +One or more external (non-builtin) providers can be registered and will +run simultaneously. Tool name collisions are detected and the duplicate +is skipped with a warning. + +Usage in run_agent.py: + self._memory_manager = MemoryManager() + self._memory_manager.add_provider(BuiltinMemoryProvider(...)) + # One or more external providers: + self._memory_manager.add_provider(plugin_provider_a) + self._memory_manager.add_provider(plugin_provider_b) + + # System prompt + prompt_parts.append(self._memory_manager.build_system_prompt()) + + # Pre-turn + context = self._memory_manager.prefetch_all(user_message) + + # Post-turn + self._memory_manager.sync_all(user_msg, assistant_response) + self._memory_manager.queue_prefetch_all(user_msg) +""" + +from __future__ import annotations + +import logging +import re +import inspect +from typing import Any, Dict, List, Optional + +from agent.memory_provider import MemoryProvider +from tools.registry import tool_error + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Context fencing helpers +# --------------------------------------------------------------------------- + +_FENCE_TAG_RE = re.compile(r'', re.IGNORECASE) +_INTERNAL_CONTEXT_RE = re.compile( + r'<\s*memory-context\s*>[\s\S]*?', + re.IGNORECASE, +) +_INTERNAL_NOTE_RE = re.compile( + r'\[System note:\s*The following is recalled memory context,\s*NOT new user input\.\s*Treat as (?:informational background data|authoritative reference data[^\]]*)\.\]\s*', + re.IGNORECASE, +) + + +def sanitize_context(text: str) -> str: + """Strip fence tags, injected context blocks, and system notes from provider output.""" + text = _INTERNAL_CONTEXT_RE.sub('', text) + text = _INTERNAL_NOTE_RE.sub('', text) + text = _FENCE_TAG_RE.sub('', text) + return text + + +class StreamingContextScrubber: + """Stateful scrubber for streaming text that may contain split memory-context spans. + + The one-shot ``sanitize_context`` regex cannot survive chunk boundaries: + a ```` opened in one delta and closed in a later delta + leaks its payload to the UI because the non-greedy block regex needs + both tags in one string. This scrubber runs a small state machine + across deltas, holding back partial-tag tails and discarding + everything inside a span (including the system-note line). + + Usage:: + + scrubber = StreamingContextScrubber() + for delta in stream: + visible = scrubber.feed(delta) + if visible: + emit(visible) + trailing = scrubber.flush() # at end of stream + if trailing: + emit(trailing) + + The scrubber is re-entrant per agent instance. Callers building new + top-level responses (new turn) should create a fresh scrubber or call + ``reset()``. + """ + + _OPEN_TAG = "" + _CLOSE_TAG = "" + + def __init__(self) -> None: + self._in_span: bool = False + self._buf: str = "" + + def reset(self) -> None: + self._in_span = False + self._buf = "" + + def feed(self, text: str) -> str: + """Return the visible portion of ``text`` after scrubbing. + + Any trailing fragment that could be the start of an open/close tag + is held back in the internal buffer and surfaced on the next + ``feed()`` call or discarded/emitted by ``flush()``. + """ + if not text: + return "" + buf = self._buf + text + self._buf = "" + out: list[str] = [] + + while buf: + if self._in_span: + idx = buf.lower().find(self._CLOSE_TAG) + if idx == -1: + # Hold back a potential partial close tag; drop the rest + held = self._max_partial_suffix(buf, self._CLOSE_TAG) + self._buf = buf[-held:] if held else "" + return "".join(out) + # Found close — skip span content + tag, continue + buf = buf[idx + len(self._CLOSE_TAG):] + self._in_span = False + else: + idx = buf.lower().find(self._OPEN_TAG) + if idx == -1: + # No open tag — hold back a potential partial open tag + held = self._max_partial_suffix(buf, self._OPEN_TAG) + if held: + out.append(buf[:-held]) + self._buf = buf[-held:] + else: + out.append(buf) + return "".join(out) + # Emit text before the tag, enter span + if idx > 0: + out.append(buf[:idx]) + buf = buf[idx + len(self._OPEN_TAG):] + self._in_span = True + + return "".join(out) + + def flush(self) -> str: + """Emit any held-back buffer at end-of-stream. + + If we're still inside an unterminated span the remaining content is + discarded (safer: leaking partial memory context is worse than a + truncated answer). Otherwise the held-back partial-tag tail is + emitted verbatim (it turned out not to be a real tag). + """ + if self._in_span: + self._buf = "" + self._in_span = False + return "" + tail = self._buf + self._buf = "" + return tail + + @staticmethod + def _max_partial_suffix(buf: str, tag: str) -> int: + """Return the length of the longest buf-suffix that is a tag-prefix. + + Case-insensitive. Returns 0 if no suffix could start the tag. + """ + tag_lower = tag.lower() + buf_lower = buf.lower() + max_check = min(len(buf_lower), len(tag_lower) - 1) + for i in range(max_check, 0, -1): + if tag_lower.startswith(buf_lower[-i:]): + return i + return 0 + + +def build_memory_context_block(raw_context: str) -> str: + """Wrap prefetched memory in a fenced block with system note.""" + if not raw_context or not raw_context.strip(): + return "" + clean = sanitize_context(raw_context) + if clean != raw_context: + logger.warning("memory provider returned pre-wrapped context; stripped") + return ( + "\n" + "[System note: The following is recalled memory context, " + "NOT new user input. Treat as authoritative reference data — " + "this is the agent's persistent memory and should inform all responses.]\n\n" + f"{clean}\n" + "" + ) + + +class MemoryManager: + """Orchestrates the built-in provider plus one or more external providers. + + The builtin provider is always first. Multiple non-builtin (external) + providers can run simultaneously. Failures in one provider never block + the others. + """ + + def __init__(self) -> None: + self._providers: List[MemoryProvider] = [] + self._tool_to_provider: Dict[str, MemoryProvider] = {} + + # -- Registration -------------------------------------------------------- + + def add_provider(self, provider: MemoryProvider) -> None: + """Register a memory provider. + + Built-in provider (name ``"builtin"``) is always accepted. + Multiple external (non-builtin) providers can be registered + and will run simultaneously. + """ + self._providers.append(provider) + + # Index tool names → provider for routing + for schema in provider.get_tool_schemas(): + tool_name = schema.get("name", "") + if tool_name and tool_name not in self._tool_to_provider: + self._tool_to_provider[tool_name] = provider + elif tool_name in self._tool_to_provider: + logger.warning( + "Memory tool name conflict: '%s' already registered by %s, " + "ignoring from %s", + tool_name, + self._tool_to_provider[tool_name].name, + provider.name, + ) + + logger.info( + "Memory provider '%s' registered (%d tools)", + provider.name, + len(provider.get_tool_schemas()), + ) + + @property + def providers(self) -> List[MemoryProvider]: + """All registered providers in order.""" + return list(self._providers) + + @property + def provider_names(self) -> List[str]: + """Names of all registered providers.""" + return [p.name for p in self._providers] + + def get_provider(self, name: str) -> Optional[MemoryProvider]: + """Get a provider by name, or None if not registered.""" + for p in self._providers: + if p.name == name: + return p + return None + + # -- System prompt ------------------------------------------------------- + + def build_system_prompt(self) -> str: + """Collect system prompt blocks from all providers. + + Returns combined text, or empty string if no providers contribute. + Each non-empty block is labeled with the provider name. + """ + blocks = [] + for provider in self._providers: + try: + block = provider.system_prompt_block() + if block and block.strip(): + blocks.append(block) + except Exception as e: + logger.warning( + "Memory provider '%s' system_prompt_block() failed: %s", + provider.name, e, + ) + return "\n\n".join(blocks) + + # -- Prefetch / recall --------------------------------------------------- + + def prefetch_all(self, query: str, *, session_id: str = "") -> str: + """Collect prefetch context from all providers. + + Returns merged context text labeled by provider. Empty providers + are skipped. Failures in one provider don't block others. + """ + parts = [] + for provider in self._providers: + try: + result = provider.prefetch(query, session_id=session_id) + if result and result.strip(): + parts.append(result) + except Exception as e: + logger.debug( + "Memory provider '%s' prefetch failed (non-fatal): %s", + provider.name, e, + ) + return "\n\n".join(parts) + + def queue_prefetch_all(self, query: str, *, session_id: str = "") -> None: + """Queue background prefetch on all providers for the next turn.""" + for provider in self._providers: + try: + provider.queue_prefetch(query, session_id=session_id) + except Exception as e: + logger.debug( + "Memory provider '%s' queue_prefetch failed (non-fatal): %s", + provider.name, e, + ) + + # -- Sync ---------------------------------------------------------------- + + def sync_all(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: + """Sync a completed turn to all providers.""" + for provider in self._providers: + try: + provider.sync_turn(user_content, assistant_content, session_id=session_id) + except Exception as e: + logger.warning( + "Memory provider '%s' sync_turn failed: %s", + provider.name, e, + ) + + # -- Tools --------------------------------------------------------------- + + def get_all_tool_schemas(self) -> List[Dict[str, Any]]: + """Collect tool schemas from all providers.""" + schemas = [] + seen = set() + for provider in self._providers: + try: + for schema in provider.get_tool_schemas(): + name = schema.get("name", "") + if name and name not in seen: + schemas.append(schema) + seen.add(name) + except Exception as e: + logger.warning( + "Memory provider '%s' get_tool_schemas() failed: %s", + provider.name, e, + ) + return schemas + + def get_all_tool_names(self) -> set: + """Return set of all tool names across all providers.""" + return set(self._tool_to_provider.keys()) + + def has_tool(self, tool_name: str) -> bool: + """Check if any provider handles this tool.""" + return tool_name in self._tool_to_provider + + def handle_tool_call( + self, tool_name: str, args: Dict[str, Any], **kwargs + ) -> str: + """Route a tool call to the correct provider. + + Returns JSON string result. Raises ValueError if no provider + handles the tool. + """ + provider = self._tool_to_provider.get(tool_name) + if provider is None: + return tool_error(f"No memory provider handles tool '{tool_name}'") + try: + return provider.handle_tool_call(tool_name, args, **kwargs) + except Exception as e: + logger.error( + "Memory provider '%s' handle_tool_call(%s) failed: %s", + provider.name, tool_name, e, + ) + return tool_error(f"Memory tool '{tool_name}' failed: {e}") + + # -- Lifecycle hooks ----------------------------------------------------- + + def on_turn_start(self, turn_number: int, message: str, **kwargs) -> None: + """Notify all providers of a new turn. + + kwargs may include: remaining_tokens, model, platform, tool_count. + """ + for provider in self._providers: + try: + provider.on_turn_start(turn_number, message, **kwargs) + except Exception as e: + logger.debug( + "Memory provider '%s' on_turn_start failed: %s", + provider.name, e, + ) + + def on_session_end(self, messages: List[Dict[str, Any]]) -> None: + """Notify all providers of session end.""" + for provider in self._providers: + try: + provider.on_session_end(messages) + except Exception as e: + logger.debug( + "Memory provider '%s' on_session_end failed: %s", + provider.name, e, + ) + + def on_session_switch( + self, + new_session_id: str, + *, + parent_session_id: str = "", + reset: bool = False, + **kwargs, + ) -> None: + """Notify all providers that the agent's session_id has rotated. + + Fires on ``/resume``, ``/branch``, ``/reset``, ``/new``, and + context compression — any path that reassigns + ``AIAgent.session_id`` without tearing the provider down. + + Providers keep running; they only need to refresh cached + per-session state so subsequent writes land in the correct + session's record. See ``MemoryProvider.on_session_switch`` for + the full contract. + """ + if not new_session_id: + return + for provider in self._providers: + try: + provider.on_session_switch( + new_session_id, + parent_session_id=parent_session_id, + reset=reset, + **kwargs, + ) + except Exception as e: + logger.debug( + "Memory provider '%s' on_session_switch failed: %s", + provider.name, e, + ) + + def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: + """Notify all providers before context compression. + + Returns combined text from providers to include in the compression + summary prompt. Empty string if no provider contributes. + """ + parts = [] + for provider in self._providers: + try: + result = provider.on_pre_compress(messages) + if result and result.strip(): + parts.append(result) + except Exception as e: + logger.debug( + "Memory provider '%s' on_pre_compress failed: %s", + provider.name, e, + ) + return "\n\n".join(parts) + + @staticmethod + def _provider_memory_write_metadata_mode(provider: MemoryProvider) -> str: + """Return how to pass metadata to a provider's memory-write hook.""" + try: + signature = inspect.signature(provider.on_memory_write) + except (TypeError, ValueError): + return "keyword" + + params = list(signature.parameters.values()) + if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params): + return "keyword" + if "metadata" in signature.parameters: + return "keyword" + + accepted = [ + p for p in params + if p.kind in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.KEYWORD_ONLY, + ) + ] + if len(accepted) >= 4: + return "positional" + return "legacy" + + def on_memory_write( + self, + action: str, + target: str, + content: str, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """Notify external providers when the built-in memory tool writes. + + Skips the builtin provider itself (it's the source of the write). + """ + for provider in self._providers: + if provider.name == "builtin": + continue + try: + metadata_mode = self._provider_memory_write_metadata_mode(provider) + if metadata_mode == "keyword": + provider.on_memory_write( + action, target, content, metadata=dict(metadata or {}) + ) + elif metadata_mode == "positional": + provider.on_memory_write(action, target, content, dict(metadata or {})) + else: + provider.on_memory_write(action, target, content) + except Exception as e: + logger.debug( + "Memory provider '%s' on_memory_write failed: %s", + provider.name, e, + ) + + def on_delegation(self, task: str, result: str, *, + child_session_id: str = "", **kwargs) -> None: + """Notify all providers that a subagent completed.""" + for provider in self._providers: + try: + provider.on_delegation( + task, result, child_session_id=child_session_id, **kwargs + ) + except Exception as e: + logger.debug( + "Memory provider '%s' on_delegation failed: %s", + provider.name, e, + ) + + def shutdown_all(self) -> None: + """Shut down all providers (reverse order for clean teardown).""" + for provider in reversed(self._providers): + try: + provider.shutdown() + except Exception as e: + logger.warning( + "Memory provider '%s' shutdown failed: %s", + provider.name, e, + ) + + def initialize_all(self, session_id: str, **kwargs) -> None: + """Initialize all providers. + + Automatically injects ``hermes_home`` into *kwargs* so that every + provider can resolve profile-scoped storage paths without importing + ``get_hermes_home()`` themselves. + """ + if "hermes_home" not in kwargs: + from hermes_constants import get_hermes_home + kwargs["hermes_home"] = str(get_hermes_home()) + for provider in self._providers: + try: + provider.initialize(session_id=session_id, **kwargs) + except Exception as e: + logger.warning( + "Memory provider '%s' initialize failed: %s", + provider.name, e, + ) diff --git a/plugin.yaml b/plugin.yaml new file mode 100644 index 0000000..b7b82e8 --- /dev/null +++ b/plugin.yaml @@ -0,0 +1,8 @@ +name: multimemory +version: 1.0.0 +description: > + Multi-memory provider plugin for Hermes. Allows running multiple memory + backends (Honcho, Mem0, SuperMemory, etc.) simultaneously. This is a + reference/save of the patched agent/memory_manager.py from the fork. + NOT currently active — kept for future use. +author: "@gortium"