🔄 LangGraph Base
State Machine Template | Domain: Core
Abstract base class for LangGraph-powered agents with state persistence, checkpointing, and HITL support.
Overview​
KOSMOS V2.0 LangGraph Base Template
Enhanced base class for LangGraph-powered agents with:
- State persistence (checkpointing)
- Human-in-the-loop (HITL) support
- Semantic router integration
- SDUI response generation
- Tool selection via Global Tool Registry
- Streaming support
This documentation is automatically extracted from source code.
Source: implementation/backend/agents/langgraph_base.py (1108 lines)
Enumerations​
WorkflowPhase​
Workflow execution phases.
| Member | Value |
|---|
HumanInputType​
Types of human input requests.
| Member | Value |
|---|
Data Models​
HumanInputRequest​
Request for human input during workflow.
@dataclass
class HumanInputRequest:
HumanInputResponse​
Response from human input.
@dataclass
class HumanInputResponse:
ToolSelection​
Selected tool for execution.
@dataclass
class ToolSelection:
ExecutionStep​
A single execution step in the workflow.
@dataclass
class ExecutionStep:
AgentGraphState​
Base state for LangGraph agent workflows.
This state is passed through all nodes and checkpointed. Agents should extend this with domain-specific fields.
Inherits from: BaseModel
PostgresCheckpointSaver​
PostgreSQL-backed checkpoint saver for state persistence.
Uses the workflow_checkpoints table for durable storage.
Inherits from: BaseCheckpointSaver
Methods​
__init__(pool)​
async aget(config: Dict) → Optional[Dict]​
Get checkpoint from database.
async aput(config: Dict, checkpoint: Dict) → Dict​
Save checkpoint to database.
get(config: Dict) → Optional[Dict]​
Sync version - wraps async.
put(config: Dict, checkpoint: Dict) → Dict​
Sync version - wraps async.
LangGraphAgent​
Enhanced base class for LangGraph-powered agents.
Features:
- Automatic state checkpointing
- Human-in-the-loop support with async callbacks
- Semantic router integration for tool selection
- SDUI component generation
- Streaming execution with progress updates
Subclasses must implement:
- create_state_class(): Return the custom state class
- define_nodes(): Define workflow nodes
- define_edges(): Define workflow edges and conditionals
Inherits from: ABC, <ast.Subscript object at 0x7fe8022d8880>
Methods​
__init__(agent_id: str, name: str, domain: str, description: str, tool_categories: Optional[List[ToolCategory]], mcp_servers: Optional[List[str]], pentarchy_voter: bool, security_veto: bool, max_iterations: int, timeout_seconds: int)​
async initialize() → None​
Initialize the agent and build the graph.
create_state_class() → type (abstract)​
Return the state class for this agent's workflow.
define_nodes() → Dict[str, Callable]​
Define custom nodes for this agent.
Override in subclass to add domain-specific nodes. Return dict of {node_name: node_function}
define_edges() → List[Tuple]​
Define custom edges for this agent.
Override in subclass to add domain-specific edges. Return list of edges as tuples:
- Simple edge: (source, target)
- Conditional: (source, condition_fn, {result: target})
async process(task: str, context: Optional[Dict[str, Any]], thread_id: Optional[str]) → Dict[str, Any]​
Process a task through the workflow.
Args: task: The task description context: Optional context (user_id, tenant_id, etc.) thread_id: Optional thread ID for resuming checkpointed state
Returns: Final state as dictionary
async resume(thread_id: str, human_input: Optional[Dict[str, Any]]) → Dict[str, Any]​
Resume a checkpointed workflow.
Args: thread_id: The thread ID to resume human_input: Optional human input to inject
Returns: Final state as dictionary
async stream(task: str, context: Optional[Dict[str, Any]])​
Stream workflow execution with progress updates.
Yields progress events as the workflow executes.
set_human_input_callback(callback: Callable) → None​
Set callback for human-in-the-loop requests.
set_progress_callback(callback: Callable) → None​
Set callback for progress updates.
set_sdui_callback(callback: Callable) → None​
Set callback for SDUI component updates.
get_available_tools() → List[Dict[str, Any]]​
Get list of available tools for this agent.
get_status() → Dict[str, Any]​
Get agent status.
async shutdown() → None​
Shutdown the agent.