LangGraph Integration for assistant-stream#
This document describes the LangGraph integration for the assistant-stream package.
Installation#
To use the LangGraph integration, install the assistant-stream package with the langgraph extra:
pip install assistant-stream[langgraph]
This will install the required dependencies including langchain-core.
Usage#
The main function provided by this integration is append_langgraph_event, which allows you to append LangGraph events to the state managed by a RunController.
Function Signature#
def append_langgraph_event(
controller: Any,
namespace: str,
type: str,
payload: Any
) -> None
Parameters#
- controller: A RunController instance from LangGraph that contains a
stateattribute - namespace: The namespace for the event (currently not used but required for future compatibility)
- type: The type of event, either
"message"or"updates" - payload: The payload for the event, format depends on the event type
Event Types#
Message Events (type="message")#
For message events, the payload must be a tuple of (messages, metadata):
- messages: Can be either a single message or a list of messages (BaseMessage instances from LangChain)
- metadata: A dictionary of metadata (currently not used but required)
The function will:
- Create a
messagesarray in the state if it doesn't exist - Convert messages to plain JSON using LangChain's
message_to_dict - Merge messages with the same ID:
- For AIMessageChunk messages, content is appended
- For other message types, the entire message is replaced
- Append new messages that don't have matching IDs
Example:
from langchain_core.messages import HumanMessage, AIMessage
from assistant_stream import append_langgraph_event
# Single message
message = HumanMessage(content="Hello", id="msg1")
append_langgraph_event(controller, "default", "message", ([message], {}))
# Multiple messages
messages = [
HumanMessage(content="Hello", id="msg1"),
AIMessage(content="Hi there!", id="msg2")
]
append_langgraph_event(controller, "default", "message", (messages, {}))
Updates Events (type="updates")#
For updates events, the payload must be a dictionary with the structure:
{
"node_name": {
"channel": new_value,
...
},
...
}
The function will:
- Update channel values for each node
- Create nodes if they don't exist
- Skip updates to the "messages" channel (as these are handled by message events)
Example:
updates = {
"agent": {
"status": "thinking",
"confidence": 0.95
},
"retriever": {
"documents_found": 5
}
}
append_langgraph_event(controller, "default", "updates", updates)
Important Notes#
-
State Format: The state must only contain plain JSON objects (lists, dicts, str, int, bool, None). All LangChain messages are automatically converted to JSON format.
-
Message Merging: When a message with an existing ID is appended:
- If both messages are AI messages (type="ai"), the content is concatenated
- For all other cases, the entire message is replaced
-
Error Handling:
- Raises
ValueErrorif the controller doesn't have astateattribute - Raises
TypeErrorif the payload format is invalid - Skips
Nonemessages silently - Ignores unknown event types
- Raises
-
Thread Safety: This function modifies the state directly and is not thread-safe. Ensure proper synchronization if using from multiple threads.
Example Integration#
Here's a complete example of using append_langgraph_event with a LangGraph RunController:
from assistant_stream import append_langgraph_event
from langchain_core.messages import HumanMessage, AIMessageChunk
class MyRunController:
def __init__(self):
self.state = {}
# Initialize controller
controller = MyRunController()
# Add initial message
user_message = HumanMessage(content="What is the weather?", id="user1")
append_langgraph_event(controller, "main", "message", ([user_message], {}))
# Add AI response chunks
ai_chunk1 = AIMessageChunk(content="The weather", id="ai1")
append_langgraph_event(controller, "main", "message", ([ai_chunk1], {}))
ai_chunk2 = AIMessageChunk(content=" is sunny today.", id="ai1")
append_langgraph_event(controller, "main", "message", ([ai_chunk2], {}))
# Update node states
updates = {
"weather_agent": {
"status": "completed",
"temperature": 72
}
}
append_langgraph_event(controller, "main", "updates", updates)
# Final state
print(controller.state)
# {
# "messages": [
# {"type": "human", "content": "What is the weather?", "id": "user1"},
# {"type": "ai", "content": "The weather is sunny today.", "id": "ai1"}
# ],
# "weather_agent": {
# "status": "completed",
# "temperature": 72
# }
# }
Testing#
The integration includes comprehensive unit tests. To run them:
python -m pytest tests/test_langgraph.py
The tests cover:
- Message appending and merging
- Updates handling
- Error cases and edge conditions
- Payload validation
- Message normalization