If you've built an agent as a Python or TypeScript script and want to add a UI to it, you need to solve two problems: streaming updates to the frontend and integrating with the UI framework. Assistant Transport handles both.
Assistant Transport streams your agent's complete state to the frontend in real-time. Unlike traditional approaches that only stream predefined message types (like text or tool calls), it streams your entire agent state—whatever structure your agent uses internally.
It consists of:
- State streaming: Efficiently streams updates to your agent state (supports any JSON object)
- UI integration: Converts your agent's state into assistant-ui components that render in the browser
- Command handling: Sends user actions (messages, tool executions, custom commands) back to your agent
When to Use Assistant Transport#
Use Assistant Transport when:
- You don't have a streaming protocol yet and need one
- You want your agent's native state to be directly accessible in the frontend
- You're building a custom agent framework or one without a streaming protocol (e.g. OSS LangGraph)
Mental Model#
The frontend receives state snapshots and converts them to React components. The goal is to have the UI be a stateless view on top of the agent framework state.
The agent server receives commands from the frontend. When a user interacts with the UI (sends a message, clicks a button, etc.), the frontend queues a command and sends it to the backend. Assistant Transport defines standard commands like add-message and add-tool-result, and you can define custom commands.
Command Lifecycle#
Commands go through the following lifecycle:
The runtime alternates between idle (no active backend request) and sending (request in flight). When a new command is created while idle, it's immediately sent. Otherwise, it's queued until the current request completes.
To implement this architecture, you need to build 2 pieces:
- Backend endpoint on the agent server that accepts commands and returns a stream of state snapshots
- Frontend-side state converter that converts state snapshots to assistant-ui's data format so that the UI primitives work
Building a Backend Endpoint#
Let's build the backend endpoint step by step. You'll need to handle incoming commands, update your agent state, and stream the updates back to the frontend.
The backend endpoint receives POST requests with the following payload:
{
state: T, // The previous state that the frontend has access to
commands: AssistantTransportCommand[],
system?: string,
tools?: Record<string, ToolJSONSchema>, // Tool definitions keyed by tool name
threadId: string | null, // The current thread/conversation identifier (null for new threads)
parentId?: string | null, // The parent message ID (included when editing or branching)
callSettings?: { maxTokens, temperature, topP, presencePenalty, frequencyPenalty, seed },
config?: { apiKey, baseUrl, modelName },
}
The backend endpoint returns a stream of state snapshots using the assistant-stream library (npm / PyPI).
Handling Commands#
The backend endpoint processes commands from the commands array:
for command in request.commands:
if command.type == "add-message":
# Handle adding a user message
elif command.type == "add-tool-result":
# Handle tool execution result
elif command.type == "my-custom-command":
# Handle your custom command
Streaming Updates#
To stream state updates, modify controller.state within your run callback:
from assistant_stream import RunController, create_run
from assistant_stream.serialization import DataStreamResponse
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
async def run_callback(controller: RunController):
# Emits "set" at path ["message"] with value "Hello"
controller.state["message"] = "Hello"
# Emits "append-text" at path ["message"] with value " World"
controller.state["message"] += " World"
# Create and return the stream
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
The state snapshots are automatically streamed to the frontend using the operations described in Streaming Protocol.
Cancellation:
create_runexposescontroller.is_cancelledandcontroller.cancelled_event.
If the response stream is closed early (for example user cancel or client disconnect),
these are set so your backend loop can exit cooperatively.
controller.cancelled_eventis a read-only signal object withwait()andis_set().
create_rungives callbacks a ~50ms cooperative shutdown window before forced task cancellation.
Callback exceptions that happen during early-close cleanup are not re-raised to the stream consumer,
but are logged with traceback at warning level for debugging.
Put critical cleanup infinallyblocks, since forced cancellation may happen after the grace window.async def run_callback(controller: RunController): while not controller.is_cancelled: # Long-running work / model loop await asyncio.sleep(0.05)async def run_callback(controller: RunController): await controller.cancelled_event.wait() # cancellation-aware shutdown path
Backend Reference Implementation#
<Tabs items={["Minimal", "Example", "LangGraph"]}>
from assistant_stream import RunController, create_run
from assistant_stream.serialization import DataStreamResponse
async def run_callback(controller: RunController):
# Initialize state
if controller.state is None:
controller.state = {}
# Process commands
for command in request.commands:
# Handle commands...
# Run your agent and stream updates
async for event in agent.stream():
# update controller.state
pass
# Create and return the stream
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
"""Chat endpoint with custom agent streaming."""
async def run_callback(controller: RunController):
# Initialize controller state
if controller.state is None:
controller.state = {"messages": []}
# Process commands
for command in request.commands:
if command.type == "add-message":
# Add message to messages array
controller.state["messages"].append(command.message)
# Run your custom agent and stream updates
async for message in your_agent.stream():
# Push message to messages array
controller.state["messages"].append(message)
# Create streaming response
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run
from assistant_stream.modules.langgraph import append_langgraph_event
@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
"""Chat endpoint using LangGraph with streaming."""
async def run_callback(controller: RunController):
# Initialize controller state
if controller.state is None:
controller.state = {}
if "messages" not in controller.state:
controller.state["messages"] = []
input_messages = []
# Process commands
for command in request.commands:
if command.type == "add-message":
text_parts = [
part.text for part in command.message.parts
if part.type == "text" and part.text
]
if text_parts:
input_messages.append(HumanMessage(content=" ".join(text_parts)))
# Create initial state for LangGraph
input_state = {"messages": input_messages}
# Stream events from LangGraph
async for namespace, event_type, chunk in graph.astream(
input_state,
stream_mode=["messages", "updates"],
subgraphs=True
):
append_langgraph_event(
controller.state,
namespace,
event_type,
chunk
)
# Create streaming response
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
Full example: python/assistant-transport-backend-langgraph
Streaming Protocol#
The assistant-stream state replication protocol allows for streaming updates to an arbitrary JSON object.
Operations#
The protocol supports two operations:
Note: We've found that these two operations are enough to handle all sorts of complex state operations efficiently.
sethandles value updates and nested structures, whileappend-textenables efficient streaming of text content.
set#
Sets a value at a specific path in the JSON object.
// Operation
{ "type": "set", "path": ["status"], "value": "completed" }
// Before
{ "status": "pending" }
// After
{ "status": "completed" }
append-text#
Appends text to an existing string value at a path.
// Operation
{ "type": "append-text", "path": ["message"], "value": " World" }
// Before
{ "message": "Hello" }
// After
{ "message": "Hello World" }
Wire Format#
The wire format will be migrated to Server-Sent Events (SSE) in a future release.The wire format is inspired by AI SDK's data stream protocol.
State Update:
aui-state:ObjectStreamOperation[]
aui-state:[{"type":"set","path":["status"],"value":"completed"}]
Error:
3:string
3:"error message"
Building a Frontend#
Now let's set up the frontend. The state converter is the heart of the integration—it transforms your agent's state into the format assistant-ui expects.
The useAssistantTransportRuntime hook is used to configure the runtime. It accepts the following config:
{
initialState: T,
api: string,
resumeApi?: string,
protocol?: "data-stream" | "assistant-transport",
converter: (state: T, connectionMetadata: ConnectionMetadata) => AssistantTransportState,
headers: Record<string, string> | Headers | (() => Promise<Record<string, string> | Headers>),
body?: object | (() => Promise<object | undefined>),
prepareSendCommandsRequest?: (body: SendCommandsRequestBody) => Record<string, unknown> | Promise<Record<string, unknown>>,
capabilities?: { edit?: boolean },
adapters?: { attachments?: AttachmentAdapter; history?: ThreadHistoryAdapter },
onResponse?: (response: Response) => void,
onFinish?: () => void,
onError?: (error: Error, params: { commands: AssistantTransportCommand[]; updateState: (updater: (state: T) => T) => void }) => void | Promise<void>,
onCancel?: (params: { commands: AssistantTransportCommand[]; updateState: (updater: (state: T) => T) => void; error?: Error }) => void
}
State Converter#
The state converter is the core of your frontend integration. It transforms your agent's state into assistant-ui's message format.
(
state: T, // Your agent's state
connectionMetadata: {
pendingCommands: Command[], // Commands not yet sent to backend
isSending: boolean, // Whether a request is in flight
toolStatuses: Record<string, ToolExecutionStatus> // Tool execution status tracking
}
) => {
messages: ThreadMessage[], // Messages to display
isRunning: boolean, // Whether the agent is running
state?: ReadonlyJSONValue // Optional custom agent state
}
Converting Messages#
Use the createMessageConverter API to transform your agent's messages to assistant-ui format:
<Tabs items={["Example", "LangChain"]}>
import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
// Define your message type
type YourMessageType = {
id: string;
role: "user" | "assistant";
content: string;
timestamp: number;
};
// Define a converter function for a single message
const exampleMessageConverter = (message: YourMessageType) => {
// Transform a single message to assistant-ui format
return {
role: message.role,
content: [{ type: "text", text: message.content }]
};
};
const messageConverter = createMessageConverter(exampleMessageConverter);
const converter = (state: YourAgentState) => {
return {
messages: messageConverter.toThreadMessages(state.messages),
isRunning: false
};
};
import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
import { convertLangChainMessages } from "@assistant-ui/react-langgraph";
const messageConverter = createMessageConverter(convertLangChainMessages);
const converter = (state: YourAgentState) => {
return {
messages: messageConverter.toThreadMessages(state.messages),
isRunning: false
};
};
Reverse mapping:
The message converter allows you to retrieve the original message format anywhere inside assistant-ui. This lets you access your agent's native message structure from any assistant-ui component:
// Get original message(s) from a ThreadMessage anywhere in assistant-ui
const originalMessage = messageConverter.toOriginalMessage(threadMessage);
Optimistic Updates from Commands#
The converter also receives connectionMetadata which contains pending commands. Use this to show optimistic updates:
const converter = (state: State, connectionMetadata: ConnectionMetadata) => {
// Extract pending messages from commands
const optimisticMessages = connectionMetadata.pendingCommands
.filter((c) => c.type === "add-message")
.map((c) => c.message);
return {
messages: [...state.messages, ...optimisticMessages],
isRunning: connectionMetadata.isSending || false
};
};
Handling Errors and Cancellations#
The onError and onCancel callbacks receive an updateState function that allows you to update the agent state on the client side without making a server request:
const runtime = useAssistantTransportRuntime({
// ... other options
onError: (error, { commands, updateState }) => {
console.error("Error occurred:", error);
console.log("Commands in transit:", commands);
// Update state to reflect the error
updateState((currentState) => ({
...currentState,
lastError: error.message,
}));
},
onCancel: ({ commands, updateState }) => {
console.log("Request cancelled");
console.log("Commands (in-transit + queued, or queued-only if called after error):", commands);
// Update state to reflect cancellation
updateState((currentState) => ({
...currentState,
status: "cancelled",
}));
},
});
Note:
onErrorreceives commands that were in transit.onCancelreceives both in-transit and queued commands when the user cancels directly; when called after an error, it only receives queued commands (in-transit commands are passed toonErrorinstead).
Custom Headers and Body#
You can pass custom headers and body to the backend endpoint:
const runtime = useAssistantTransportRuntime({
// ... other options
headers: {
"Authorization": "Bearer token",
"X-Custom-Header": "value",
},
body: {
customField: "value",
},
});
Dynamic Headers and Body#
You can also evaluate the header and body payloads on every request by passing an async function:
const runtime = useAssistantTransportRuntime({
// ... other options
headers: async () => ({
"Authorization": `Bearer ${await getAccessToken()}`,
"X-Request-ID": crypto.randomUUID(),
}),
body: async () => ({
customField: "value",
requestId: crypto.randomUUID(),
timestamp: Date.now(),
}),
});
Transforming the Request Body#
Use prepareSendCommandsRequest to transform the entire request body before it is sent to the backend. This receives the fully assembled body object and returns the (potentially transformed) body.
const runtime = useAssistantTransportRuntime({
// ... other options
prepareSendCommandsRequest: (body) => ({
...body,
trackingId: crypto.randomUUID(),
}),
});
This is useful for adding tracking IDs, transforming commands, or injecting metadata that depends on the assembled request:
const runtime = useAssistantTransportRuntime({
// ... other options
prepareSendCommandsRequest: (body) => ({
...body,
commands: body.commands.map((cmd) =>
cmd.type === "add-message"
? { ...cmd, trackingId: crypto.randomUUID() }
: cmd,
),
}),
});
Editing Messages#
By default, editing messages is disabled. To enable it, set capabilities.edit to true:
const runtime = useAssistantTransportRuntime({
// ... other options
capabilities: {
edit: true,
},
});
add-message commands always include parentId and sourceId fields:
{
type: "add-message",
message: { role: "user", parts: [...] },
parentId: "msg-3", // The message after which this message should be inserted
sourceId: "msg-4", // The ID of the message being replaced (null for new messages)
}
Backend Handling#
When the backend receives an add-message command with a parentId, it should:
- Truncate all messages after the message with
parentId - Append the new message
- Stream the updated state back to the frontend
for command in request.commands:
if command.type == "add-message":
if hasattr(command, "parentId") and command.parentId is not None:
# Find the parent message index and truncate
parent_idx = next(
i for i, m in enumerate(messages) if m.id == command.parentId
)
messages = messages[:parent_idx + 1]
# Append the new message
messages.append(command.message)
Resuming from a Sync Server#
We provide a sync server currently only as part of the enterprise plan. Please contact us for more information.When a user refreshes the page, switches tabs, or reconnects after a network interruption, the backend may still be generating a response. resumeRun allows the frontend to reconnect to the active backend stream.
Setup#
Pass a resumeApi URL to useAssistantTransportRuntime that points to your sync server:
const runtime = useAssistantTransportRuntime({
// ... other options
api: "http://localhost:8010/assistant",
resumeApi: "http://localhost:8010/resume", // Sync server endpoint
});
Resuming on thread switch or page load#
When switching to a thread or mounting a component, check if the backend is still running and call resumeRun:
import { useAui } from "@assistant-ui/react";
import { useEffect, useRef } from "react";
function useResumeOnMount(threadId: string) {
const aui = useAui();
const hasCheckedRef = useRef(false);
useEffect(() => {
if (hasCheckedRef.current) return;
hasCheckedRef.current = true;
const checkAndResume = async () => {
const status = await fetch(
`/api/sync-server/status/${threadId}`,
).then((r) => r.json());
if (status.isRunning) {
const parentId =
aui.thread().getState().messages.at(-1)?.id ?? null;
aui.thread().resumeRun({ parentId });
}
};
checkAndResume();
}, [aui, threadId]);
}
For the AssistantTransport runtime, you do not need to pass a stream parameter — the runtime uses the configured resumeApi endpoint to reconnect.
Accessing Runtime State#
Use the useAssistantTransportState hook to access the current agent state from any component:
import { useAssistantTransportState } from "@assistant-ui/react";
function MyComponent() {
const state = useAssistantTransportState();
return <div>{JSON.stringify(state)}</div>;
}
You can also pass a selector function to extract specific values:
function MyComponent() {
const messages = useAssistantTransportState((state) => state.messages);
return <div>Message count: {messages.length}</div>;
}
Type Safety#
Use module augmentation to add types for your agent state:
import "@assistant-ui/react";
declare module "@assistant-ui/react" {
namespace Assistant {
interface ExternalState {
myState: {
messages: Message[];
customField: string;
};
}
}
}
Note: Place this file anywhere in your project (e.g.,
src/assistant.config.tsor at the project root). TypeScript will automatically pick up the type augmentation through module resolution—you don't need to import this file anywhere.
After adding the type augmentation, useAssistantTransportState will be fully typed:
function MyComponent() {
// TypeScript knows about your custom fields
const customField = useAssistantTransportState((state) => state.customField);
return <div>{customField}</div>;
}
Accessing the Original Message#
If you're using createMessageConverter, you can access the original message format from any assistant-ui component using the converter's toOriginalMessage method:
import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
import { useAuiState } from "@assistant-ui/react";
const messageConverter = createMessageConverter(yourMessageConverter);
function MyMessageComponent() {
const message = useAuiState((s) => s.message);
// Get the original message(s) from the converted ThreadMessage
const originalMessage = messageConverter.toOriginalMessage(message);
// Access your agent's native message structure
return <div>{originalMessage.yourCustomField}</div>;
}
You can also use toOriginalMessages to get all original messages when a ThreadMessage was created from multiple source messages:
const originalMessages = messageConverter.toOriginalMessages(message);
Frontend Reference Implementation#
<Tabs items={["Example", "LangGraph"]}>
"use client";
import {
AssistantRuntimeProvider,
AssistantTransportConnectionMetadata,
useAssistantTransportRuntime,
} from "@assistant-ui/react";
type State = {
messages: Message[];
};
// Converter function: transforms agent state to assistant-ui format
const converter = (
state: State,
connectionMetadata: AssistantTransportConnectionMetadata,
) => {
// Add optimistic updates for pending commands
const optimisticMessages = connectionMetadata.pendingCommands
.filter((c) => c.type === "add-message")
.map((c) => c.message);
return {
messages: [...state.messages, ...optimisticMessages],
isRunning: connectionMetadata.isSending || false,
};
};
export function MyRuntimeProvider({ children }) {
const runtime = useAssistantTransportRuntime({
initialState: {
messages: [],
},
api: "http://localhost:8010/assistant",
converter,
headers: async () => ({
"Authorization": "Bearer token",
}),
body: {
"custom-field": "custom-value",
},
onResponse: (response) => {
console.log("Response received from server");
},
onFinish: () => {
console.log("Conversation completed");
},
onError: (error, { commands, updateState }) => {
console.error("Assistant transport error:", error);
console.log("Commands in transit:", commands);
},
onCancel: ({ commands, updateState }) => {
console.log("Request cancelled");
console.log("Commands (in-transit + queued, or queued-only if called after error):", commands);
},
});
return (
<AssistantRuntimeProvider runtime={runtime}>
{children}
</AssistantRuntimeProvider>
);
}
"use client";
import {
AssistantRuntimeProvider,
AssistantTransportConnectionMetadata,
unstable_createMessageConverter as createMessageConverter,
useAssistantTransportRuntime,
} from "@assistant-ui/react";
import {
convertLangChainMessages,
LangChainMessage,
} from "@assistant-ui/react-langgraph";
type State = {
messages: LangChainMessage[];
};
const LangChainMessageConverter = createMessageConverter(
convertLangChainMessages,
);
// Converter function: transforms agent state to assistant-ui format
const converter = (
state: State,
connectionMetadata: AssistantTransportConnectionMetadata,
) => {
// Add optimistic updates for pending commands
const optimisticStateMessages = connectionMetadata.pendingCommands.map(
(c): LangChainMessage[] => {
if (c.type === "add-message") {
return [
{
type: "human" as const,
content: [
{
type: "text" as const,
text: c.message.parts
.map((p) => (p.type === "text" ? p.text : ""))
.join("\n"),
},
],
},
];
}
return [];
},
);
const messages = [...state.messages, ...optimisticStateMessages.flat()];
return {
messages: LangChainMessageConverter.toThreadMessages(messages),
isRunning: connectionMetadata.isSending || false,
};
};
export function MyRuntimeProvider({ children }) {
const runtime = useAssistantTransportRuntime({
initialState: {
messages: [],
},
api: "http://localhost:8010/assistant",
converter,
headers: async () => ({
"Authorization": "Bearer token",
}),
body: {
"custom-field": "custom-value",
},
onResponse: (response) => {
console.log("Response received from server");
},
onFinish: () => {
console.log("Conversation completed");
},
onError: (error, { commands, updateState }) => {
console.error("Assistant transport error:", error);
console.log("Commands in transit:", commands);
},
onCancel: ({ commands, updateState }) => {
console.log("Request cancelled");
console.log("Commands (in-transit + queued, or queued-only if called after error):", commands);
},
});
return (
<AssistantRuntimeProvider runtime={runtime}>
{children}
</AssistantRuntimeProvider>
);
}
Full example: examples/with-assistant-transport
Custom Commands#
Defining Custom Commands#
Use module augmentation to define a custom command:
import "@assistant-ui/react";
declare module "@assistant-ui/react" {
namespace Assistant {
interface Commands {
myCustomCommand: {
type: "my-custom-command";
data: string;
};
}
}
}
Issuing Commands#
Use the useAssistantTransportSendCommand hook to send custom commands:
import { useAssistantTransportSendCommand } from "@assistant-ui/react";
function MyComponent() {
const sendCommand = useAssistantTransportSendCommand();
const handleClick = () => {
sendCommand({
type: "my-custom-command",
data: "Hello, world!",
});
};
return <button onClick={handleClick}>Send Custom Command</button>;
}
Backend Integration#
The backend receives custom commands in the commands array, just like built-in commands:
for command in request.commands:
if command.type == "add-message":
# Handle add-message command
elif command.type == "add-tool-result":
# Handle add-tool-result command
elif command.type == "my-custom-command":
# Handle your custom command
data = command.data
Optimistic Updates#
Update the state converter to optimistically handle the custom command:
const converter = (state: State, connectionMetadata: ConnectionMetadata) => {
// Filter custom commands from pending commands
const customCommands = connectionMetadata.pendingCommands.filter(
(c) => c.type === "my-custom-command"
);
// Apply optimistic updates based on custom commands
const optimisticState = {
...state,
customData: customCommands.map((c) => c.data),
};
return {
messages: state.messages,
state: optimisticState,
isRunning: connectionMetadata.isSending || false,
};
};
Cancellation and Error Behavior#
Custom commands follow the same lifecycle as built-in commands. You can update your onError and onCancel handlers to take custom commands into account:
const runtime = useAssistantTransportRuntime({
// ... other options
onError: (error, { commands, updateState }) => {
// Check if any custom commands were in transit
const customCommands = commands.filter((c) => c.type === "my-custom-command");
if (customCommands.length > 0) {
// Handle custom command errors
updateState((state) => ({
...state,
customCommandFailed: true,
}));
}
},
onCancel: ({ commands, updateState }) => {
// Check if any custom commands were queued or in transit
const customCommands = commands.filter((c) => c.type === "my-custom-command");
if (customCommands.length > 0) {
// Handle custom command cancellation
updateState((state) => ({
...state,
customCommandCancelled: true,
}));
}
},
});