Skip to main content

Agent Graph

Every message follows the same pipeline: safety check first, then memory retrieval, then routing, then response, then post-response memory evaluation. Some memory commits happen immediately; others wait for the shared session-end seam. Click any step below to see what it does.

Click any step to expand. The pipeline shows the v0.9+ safety-first topology with parallel post-response memory evaluation.


Topology

The parent graph has 8 nodes. The therapeutic subgraph (compiled as a single parent-level node) hides 7 internal nodes (dispatcher + 6 mode nodes). The crisis gate routes between branches via Command(goto=...). The two extractors fan out in parallel after finalize, but they now feed deterministic write policy instead of blindly writing every candidate to long-term memory.

build_agent_workflow()RetryPolicyoperator.add_merge_dictsidle
START
build_initial_state
crisis_gate_node
safety first
Command(goto=...)
Crisis branch
crisis_response
PFA overlay
crisis_log
audit record
Therapeutic branch
load_memory
hybrid RRF retrieval
therapeutic_subgraph
dispatcher + 6 modes
finalize_turn_node
operator.add reducer
parallel fan-out
extract_facts
semantic LLM
extract_procedural
style rules LLM
diagnostics via _merge_dicts reducer
END
AgentOutput

Node responsibilities

Each node has its own datasheet on the Nodes page — filterable by category (safety, memory, routing, extraction, terminal) or capability (LLM, retry, reducer, parallel). Every card shows the node's inputs, outputs, skip conditions, and source file.

Therapeutic subgraph

The dispatcher picks exactly one mode per turn:

ModeWhen it fires
supportiveDefault — user sharing feelings, seeking support, or greeting
reflectiveUser describing a recurring pattern they've already named
clarifyingAmbiguous message — agent needs context before responding
psychoeducationUser describes a reaction AND seeks understanding
guided_exerciseUser requests a structured technique (12 exercises)
closingUser signals wind-down ("I should go", "thanks, this helped")

The dispatcher uses hybrid classification: high-precision regex fast paths for obvious cases (confusion markers -> clarifying, active exercise re-entry -> guided_exercise), LLM classifier for the ambiguous middle, regex fallback when no LLM is available.

Design principles

Safety is sequential. The crisis gate runs before any memory retrieval or response generation. There is no path through the graph that loads context without first passing the safety check.

Routing is deterministic-first. Regex patterns fire before the LLM. Most turns never need an LLM classification call.

Parallel post-response memory evaluation. The two extractor nodes fan out in parallel from finalize_turn_node. Each lane does LLM candidate extraction plus deterministic write policy. Some candidates commit immediately; others are buffered for session end or dropped. Both still write to the diagnostics dict via a _merge_dicts reducer — no manual dict spreading needed, no race conditions.

Reducer-backed accumulation. history and transcript use operator.add reducers. Each turn emits only the new entries (user turn at init, assistant turn at finalize); the checkpointer accumulates prior turns automatically. No get_history() calls on the hot path.

Structured working memory. load_memory_node returns raw WorkingMemoryEntry dicts (semantic and episodic types). Formatting happens on demand at prompt-build time and CLI render time — not in the graph.

Defense-in-depth retry. All I/O nodes have RetryPolicy(max_attempts=2). Nodes catch their own expected exceptions internally; the retry policy covers unexpected transient failures outside the node's error handling.

Small-talk gate saves cost. Both extractors check whether the message is obviously small talk before making the LLM call. Greetings and acknowledgments skip extraction entirely.

Per-turn diagnostics. Every node stamps timing and decision metadata into state["diagnostics"] via the _merge_dicts reducer. Keys include load_memory_ms, crisis_gate_ms, extract_facts_ms, extract_procedural_ms, and turn_total_ms.

Runtime context

Dependencies are injected via WorkflowContext, a frozen dataclass with attribute access (runtime.context.llm_client, not dict access). Fields: llm_client, memory_store, crisis_log_backend, memory_mode, embedding_provider.

Key files

FileWhat it does
agent/graph.pybuild_agent_workflow(), build_initial_state(), state_to_output()
agent/state.pyAgentState TypedDict with reducer annotations
agent/working_memory.pyWorkingMemoryEntry types, factory functions, formatters
agent/runtime_context.pyWorkflowContext frozen dataclass
agent/therapeutic/graph.pybuild_therapeutic_subgraph() with TherapeuticSubgraphOutput
agent/therapeutic/dispatcher.pyHybrid regex + LLM mode classification
agent/therapeutic/prompts.pyKnowledge composition + on-demand working memory formatting
agent/nodes/load_memory.pyMemory retrieval + query embedding
agent/nodes/crisis_gate.pyCrisis classification + routing
agent/nodes/crisis_response.pyCrisis response generation
agent/nodes/crisis_log.pyAudit log writer
agent/nodes/extract_facts.pySemantic fact extraction
agent/nodes/extract_procedural_rules.pyProcedural rule extraction
agent/nodes/commit_session_memory.pySession-end promotion for held semantic / procedural candidates
agent/nodes/summarize_session.pySession-end episodic summarization
agent/nodes/finalize_turn.pyTranscript finalization (single-element delta)
agent/memory/small_talk_gate.pyPre-extractor small-talk heuristic
agent/persistence.pyPersistentAgentRuntime — SQLite-backed thread state

Adding a new therapeutic mode

  1. Create knowledge/response_modes/your_mode.md with the mode's knowledge content
  2. Create agent/therapeutic/your_mode.py with the response node function
  3. Add a knowledge tuple and instructions block in agent/therapeutic/prompts.py
  4. Register the node in agent/therapeutic/graph.py (subgraph.add_node(...))
  5. Add dispatch patterns in agent/therapeutic/dispatcher.py (regex fast paths + LLM prompt update)
  6. Add cases to eval/datasets/therapeutic_routing_v0.json
  7. Add a _STAGE_LABELS entry in opencouch_cli/app.py if the node name differs from the CLI's stage label vocabulary