Documents
assistant-transport
assistant-transport
Type
External
Status
Published
Created
Mar 17, 2026
Updated
Apr 2, 2026
Updated by
Dosu Bot

If you've built an agent as a Python or TypeScript script and want to add a UI to it, you need to solve two problems: streaming updates to the frontend and integrating with the UI framework. Assistant Transport handles both.

Assistant Transport streams your agent's complete state to the frontend in real-time. Unlike traditional approaches that only stream predefined message types (like text or tool calls), it streams your entire agent state—whatever structure your agent uses internally.

It consists of:

  • State streaming: Efficiently streams updates to your agent state (supports any JSON object)
  • UI integration: Converts your agent's state into assistant-ui components that render in the browser
  • Command handling: Sends user actions (messages, tool executions, custom commands) back to your agent

When to Use Assistant Transport#

Use Assistant Transport when:

  • You don't have a streaming protocol yet and need one
  • You want your agent's native state to be directly accessible in the frontend
  • You're building a custom agent framework or one without a streaming protocol (e.g. OSS LangGraph)

Mental Model#

Loading diagram...

The frontend receives state snapshots and converts them to React components. The goal is to have the UI be a stateless view on top of the agent framework state.

The agent server receives commands from the frontend. When a user interacts with the UI (sends a message, clicks a button, etc.), the frontend queues a command and sends it to the backend. Assistant Transport defines standard commands like add-message and add-tool-result, and you can define custom commands.

Command Lifecycle#

Commands go through the following lifecycle:

Loading diagram...

The runtime alternates between idle (no active backend request) and sending (request in flight). When a new command is created while idle, it's immediately sent. Otherwise, it's queued until the current request completes.

Loading diagram...

To implement this architecture, you need to build 2 pieces:

  1. Backend endpoint on the agent server that accepts commands and returns a stream of state snapshots
  2. Frontend-side state converter that converts state snapshots to assistant-ui's data format so that the UI primitives work

Building a Backend Endpoint#

Let's build the backend endpoint step by step. You'll need to handle incoming commands, update your agent state, and stream the updates back to the frontend.

The backend endpoint receives POST requests with the following payload:

{
  state: T, // The previous state that the frontend has access to
  commands: AssistantTransportCommand[],
  system?: string,
  tools?: Record<string, ToolJSONSchema>, // Tool definitions keyed by tool name
  threadId: string | null, // The current thread/conversation identifier (null for new threads)
  parentId?: string | null, // The parent message ID (included when editing or branching)
  callSettings?: { maxTokens, temperature, topP, presencePenalty, frequencyPenalty, seed },
  config?: { apiKey, baseUrl, modelName },
}
**Migrating from top-level fields:** `callSettings` and `config` fields were previously spread at the top level of the request body (e.g. `body.modelName` instead of `body.config.modelName`). Both formats are currently sent for backward compatibility, but the top-level fields are deprecated and will be removed in a future version. Update your backend to read from the nested objects.

The backend endpoint returns a stream of state snapshots using the assistant-stream library (npm / PyPI).

Handling Commands#

The backend endpoint processes commands from the commands array:

for command in request.commands:
    if command.type == "add-message":
        # Handle adding a user message
    elif command.type == "add-tool-result":
        # Handle tool execution result
    elif command.type == "my-custom-command":
        # Handle your custom command

Streaming Updates#

To stream state updates, modify controller.state within your run callback:

from assistant_stream import RunController, create_run
from assistant_stream.serialization import DataStreamResponse

@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
    async def run_callback(controller: RunController):
        # Emits "set" at path ["message"] with value "Hello"
        controller.state["message"] = "Hello"

        # Emits "append-text" at path ["message"] with value " World"
        controller.state["message"] += " World"

    # Create and return the stream
    stream = create_run(run_callback, state=request.state)
    return DataStreamResponse(stream)

The state snapshots are automatically streamed to the frontend using the operations described in Streaming Protocol.

Cancellation: create_run exposes controller.is_cancelled and controller.cancelled_event.
If the response stream is closed early (for example user cancel or client disconnect),
these are set so your backend loop can exit cooperatively.
controller.cancelled_event is a read-only signal object with wait() and is_set().
create_run gives callbacks a ~50ms cooperative shutdown window before forced task cancellation.
Callback exceptions that happen during early-close cleanup are not re-raised to the stream consumer,
but are logged with traceback at warning level for debugging.
Put critical cleanup in finally blocks, since forced cancellation may happen after the grace window.

async def run_callback(controller: RunController):
while not controller.is_cancelled:
# Long-running work / model loop
await asyncio.sleep(0.05)
async def run_callback(controller: RunController):
await controller.cancelled_event.wait()
# cancellation-aware shutdown path

Backend Reference Implementation#

<Tabs items={["Minimal", "Example", "LangGraph"]}>

from assistant_stream import RunController, create_run
from assistant_stream.serialization import DataStreamResponse

async def run_callback(controller: RunController):
    # Initialize state
    if controller.state is None:
        controller.state = {}

    # Process commands
    for command in request.commands:
        # Handle commands...

    # Run your agent and stream updates
    async for event in agent.stream():
        # update controller.state
        pass

# Create and return the stream
stream = create_run(run_callback, state=request.state)
return DataStreamResponse(stream)
from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run

@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
    """Chat endpoint with custom agent streaming."""

    async def run_callback(controller: RunController):
        # Initialize controller state
        if controller.state is None:
            controller.state = {"messages": []}

        # Process commands
        for command in request.commands:
            if command.type == "add-message":
                # Add message to messages array
                controller.state["messages"].append(command.message)

        # Run your custom agent and stream updates
        async for message in your_agent.stream():
            # Push message to messages array
            controller.state["messages"].append(message)

    # Create streaming response
    stream = create_run(run_callback, state=request.state)
    return DataStreamResponse(stream)
from assistant_stream.serialization import DataStreamResponse
from assistant_stream import RunController, create_run
from assistant_stream.modules.langgraph import append_langgraph_event

@app.post("/assistant")
async def chat_endpoint(request: ChatRequest):
    """Chat endpoint using LangGraph with streaming."""

    async def run_callback(controller: RunController):
        # Initialize controller state
        if controller.state is None:
            controller.state = {}
        if "messages" not in controller.state:
            controller.state["messages"] = []

        input_messages = []

        # Process commands
        for command in request.commands:
            if command.type == "add-message":
                text_parts = [
                    part.text for part in command.message.parts
                    if part.type == "text" and part.text
                ]
                if text_parts:
                    input_messages.append(HumanMessage(content=" ".join(text_parts)))

        # Create initial state for LangGraph
        input_state = {"messages": input_messages}

        # Stream events from LangGraph
        async for namespace, event_type, chunk in graph.astream(
            input_state,
            stream_mode=["messages", "updates"],
            subgraphs=True
        ):
            append_langgraph_event(
                controller.state,
                namespace,
                event_type,
                chunk
            )

    # Create streaming response
    stream = create_run(run_callback, state=request.state)
    return DataStreamResponse(stream)

Full example: python/assistant-transport-backend-langgraph

Streaming Protocol#

The assistant-stream state replication protocol allows for streaming updates to an arbitrary JSON object.

Operations#

The protocol supports two operations:

Note: We've found that these two operations are enough to handle all sorts of complex state operations efficiently. set handles value updates and nested structures, while append-text enables efficient streaming of text content.

set#

Sets a value at a specific path in the JSON object.

// Operation
{ "type": "set", "path": ["status"], "value": "completed" }

// Before
{ "status": "pending" }

// After
{ "status": "completed" }

append-text#

Appends text to an existing string value at a path.

// Operation
{ "type": "append-text", "path": ["message"], "value": " World" }

// Before
{ "message": "Hello" }

// After
{ "message": "Hello World" }

Wire Format#

The wire format will be migrated to Server-Sent Events (SSE) in a future release.

The wire format is inspired by AI SDK's data stream protocol.

State Update:

aui-state:ObjectStreamOperation[]
aui-state:[{"type":"set","path":["status"],"value":"completed"}]

Error:

3:string
3:"error message"

Building a Frontend#

Now let's set up the frontend. The state converter is the heart of the integration—it transforms your agent's state into the format assistant-ui expects.

The useAssistantTransportRuntime hook is used to configure the runtime. It accepts the following config:

{
  initialState: T,
  api: string,
  resumeApi?: string,
  protocol?: "data-stream" | "assistant-transport",
  converter: (state: T, connectionMetadata: ConnectionMetadata) => AssistantTransportState,
  headers: Record<string, string> | Headers | (() => Promise<Record<string, string> | Headers>),
  body?: object | (() => Promise<object | undefined>),
  prepareSendCommandsRequest?: (body: SendCommandsRequestBody) => Record<string, unknown> | Promise<Record<string, unknown>>,
  capabilities?: { edit?: boolean },
  adapters?: { attachments?: AttachmentAdapter; history?: ThreadHistoryAdapter },
  onResponse?: (response: Response) => void,
  onFinish?: () => void,
  onError?: (error: Error, params: { commands: AssistantTransportCommand[]; updateState: (updater: (state: T) => T) => void }) => void | Promise<void>,
  onCancel?: (params: { commands: AssistantTransportCommand[]; updateState: (updater: (state: T) => T) => void; error?: Error }) => void
}

State Converter#

The state converter is the core of your frontend integration. It transforms your agent's state into assistant-ui's message format.

(
  state: T, // Your agent's state
  connectionMetadata: {
    pendingCommands: Command[], // Commands not yet sent to backend
    isSending: boolean, // Whether a request is in flight
    toolStatuses: Record<string, ToolExecutionStatus> // Tool execution status tracking
  }
) => {
  messages: ThreadMessage[], // Messages to display
  isRunning: boolean, // Whether the agent is running
  state?: ReadonlyJSONValue // Optional custom agent state
}

Converting Messages#

Use the createMessageConverter API to transform your agent's messages to assistant-ui format:

<Tabs items={["Example", "LangChain"]}>

import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";

// Define your message type
type YourMessageType = {
  id: string;
  role: "user" | "assistant";
  content: string;
  timestamp: number;
};

// Define a converter function for a single message
const exampleMessageConverter = (message: YourMessageType) => {
  // Transform a single message to assistant-ui format
  return {
    role: message.role,
    content: [{ type: "text", text: message.content }]
  };
};

const messageConverter = createMessageConverter(exampleMessageConverter);

const converter = (state: YourAgentState) => {
  return {
    messages: messageConverter.toThreadMessages(state.messages),
    isRunning: false
  };
};
import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
import { convertLangChainMessages } from "@assistant-ui/react-langgraph";

const messageConverter = createMessageConverter(convertLangChainMessages);

const converter = (state: YourAgentState) => {
  return {
    messages: messageConverter.toThreadMessages(state.messages),
    isRunning: false
  };
};

Reverse mapping:

The message converter allows you to retrieve the original message format anywhere inside assistant-ui. This lets you access your agent's native message structure from any assistant-ui component:

// Get original message(s) from a ThreadMessage anywhere in assistant-ui
const originalMessage = messageConverter.toOriginalMessage(threadMessage);

Optimistic Updates from Commands#

The converter also receives connectionMetadata which contains pending commands. Use this to show optimistic updates:

const converter = (state: State, connectionMetadata: ConnectionMetadata) => {
  // Extract pending messages from commands
  const optimisticMessages = connectionMetadata.pendingCommands
    .filter((c) => c.type === "add-message")
    .map((c) => c.message);

  return {
    messages: [...state.messages, ...optimisticMessages],
    isRunning: connectionMetadata.isSending || false
  };
};

Handling Errors and Cancellations#

The onError and onCancel callbacks receive an updateState function that allows you to update the agent state on the client side without making a server request:

const runtime = useAssistantTransportRuntime({
  // ... other options
  onError: (error, { commands, updateState }) => {
    console.error("Error occurred:", error);
    console.log("Commands in transit:", commands);

    // Update state to reflect the error
    updateState((currentState) => ({
      ...currentState,
      lastError: error.message,
    }));
  },
  onCancel: ({ commands, updateState }) => {
    console.log("Request cancelled");
    console.log("Commands (in-transit + queued, or queued-only if called after error):", commands);

    // Update state to reflect cancellation
    updateState((currentState) => ({
      ...currentState,
      status: "cancelled",
    }));
  },
});

Note: onError receives commands that were in transit. onCancel receives both in-transit and queued commands when the user cancels directly; when called after an error, it only receives queued commands (in-transit commands are passed to onError instead).

Custom Headers and Body#

You can pass custom headers and body to the backend endpoint:

const runtime = useAssistantTransportRuntime({
  // ... other options
  headers: {
    "Authorization": "Bearer token",
    "X-Custom-Header": "value",
  },
  body: {
    customField: "value",
  },
});

Dynamic Headers and Body#

You can also evaluate the header and body payloads on every request by passing an async function:

const runtime = useAssistantTransportRuntime({
  // ... other options
  headers: async () => ({
    "Authorization": `Bearer ${await getAccessToken()}`,
    "X-Request-ID": crypto.randomUUID(),
  }),
  body: async () => ({
    customField: "value",
    requestId: crypto.randomUUID(),
    timestamp: Date.now(),
  }),
});

Transforming the Request Body#

Use prepareSendCommandsRequest to transform the entire request body before it is sent to the backend. This receives the fully assembled body object and returns the (potentially transformed) body.

const runtime = useAssistantTransportRuntime({
  // ... other options
  prepareSendCommandsRequest: (body) => ({
    ...body,
    trackingId: crypto.randomUUID(),
  }),
});

This is useful for adding tracking IDs, transforming commands, or injecting metadata that depends on the assembled request:

const runtime = useAssistantTransportRuntime({
  // ... other options
  prepareSendCommandsRequest: (body) => ({
    ...body,
    commands: body.commands.map((cmd) =>
      cmd.type === "add-message"
        ? { ...cmd, trackingId: crypto.randomUUID() }
        : cmd,
    ),
  }),
});

Editing Messages#

By default, editing messages is disabled. To enable it, set capabilities.edit to true:

const runtime = useAssistantTransportRuntime({
  // ... other options
  capabilities: {
    edit: true,
  },
});

add-message commands always include parentId and sourceId fields:

{
  type: "add-message",
  message: { role: "user", parts: [...] },
  parentId: "msg-3", // The message after which this message should be inserted
  sourceId: "msg-4", // The ID of the message being replaced (null for new messages)
}

Backend Handling#

When the backend receives an add-message command with a parentId, it should:

  1. Truncate all messages after the message with parentId
  2. Append the new message
  3. Stream the updated state back to the frontend
for command in request.commands:
    if command.type == "add-message":
        if hasattr(command, "parentId") and command.parentId is not None:
            # Find the parent message index and truncate
            parent_idx = next(
                i for i, m in enumerate(messages) if m.id == command.parentId
            )
            messages = messages[:parent_idx + 1]
        # Append the new message
        messages.append(command.message)
`parentId` and `sourceId` are always included on `add-message` commands. For new messages, `sourceId` will be `null`.

Resuming from a Sync Server#

We provide a sync server currently only as part of the enterprise plan. Please contact us for more information.

When a user refreshes the page, switches tabs, or reconnects after a network interruption, the backend may still be generating a response. resumeRun allows the frontend to reconnect to the active backend stream.

Setup#

Pass a resumeApi URL to useAssistantTransportRuntime that points to your sync server:

const runtime = useAssistantTransportRuntime({
  // ... other options
  api: "http://localhost:8010/assistant",
  resumeApi: "http://localhost:8010/resume", // Sync server endpoint
});

Resuming on thread switch or page load#

When switching to a thread or mounting a component, check if the backend is still running and call resumeRun:

import { useAui } from "@assistant-ui/react";
import { useEffect, useRef } from "react";

function useResumeOnMount(threadId: string) {
  const aui = useAui();
  const hasCheckedRef = useRef(false);

  useEffect(() => {
    if (hasCheckedRef.current) return;
    hasCheckedRef.current = true;

    const checkAndResume = async () => {
      const status = await fetch(
        `/api/sync-server/status/${threadId}`,
      ).then((r) => r.json());

      if (status.isRunning) {
        const parentId =
          aui.thread().getState().messages.at(-1)?.id ?? null;
        aui.thread().resumeRun({ parentId });
      }
    };

    checkAndResume();
  }, [aui, threadId]);
}

For the AssistantTransport runtime, you do not need to pass a stream parameter — the runtime uses the configured resumeApi endpoint to reconnect.

Accessing Runtime State#

Use the useAssistantTransportState hook to access the current agent state from any component:

import { useAssistantTransportState } from "@assistant-ui/react";

function MyComponent() {
  const state = useAssistantTransportState();

  return <div>{JSON.stringify(state)}</div>;
}

You can also pass a selector function to extract specific values:

function MyComponent() {
  const messages = useAssistantTransportState((state) => state.messages);

  return <div>Message count: {messages.length}</div>;
}

Type Safety#

Use module augmentation to add types for your agent state:

import "@assistant-ui/react";

declare module "@assistant-ui/react" {
  namespace Assistant {
    interface ExternalState {
      myState: {
        messages: Message[];
        customField: string;
      };
    }
  }
}

Note: Place this file anywhere in your project (e.g., src/assistant.config.ts or at the project root). TypeScript will automatically pick up the type augmentation through module resolution—you don't need to import this file anywhere.

After adding the type augmentation, useAssistantTransportState will be fully typed:

function MyComponent() {
  // TypeScript knows about your custom fields
  const customField = useAssistantTransportState((state) => state.customField);

  return <div>{customField}</div>;
}

Accessing the Original Message#

If you're using createMessageConverter, you can access the original message format from any assistant-ui component using the converter's toOriginalMessage method:

import { unstable_createMessageConverter as createMessageConverter } from "@assistant-ui/react";
import { useAuiState } from "@assistant-ui/react";

const messageConverter = createMessageConverter(yourMessageConverter);

function MyMessageComponent() {
  const message = useAuiState((s) => s.message);

  // Get the original message(s) from the converted ThreadMessage
  const originalMessage = messageConverter.toOriginalMessage(message);

  // Access your agent's native message structure
  return <div>{originalMessage.yourCustomField}</div>;
}

You can also use toOriginalMessages to get all original messages when a ThreadMessage was created from multiple source messages:

const originalMessages = messageConverter.toOriginalMessages(message);

Frontend Reference Implementation#

<Tabs items={["Example", "LangGraph"]}>

"use client";

import {
  AssistantRuntimeProvider,
  AssistantTransportConnectionMetadata,
  useAssistantTransportRuntime,
} from "@assistant-ui/react";

type State = {
  messages: Message[];
};

// Converter function: transforms agent state to assistant-ui format
const converter = (
  state: State,
  connectionMetadata: AssistantTransportConnectionMetadata,
) => {
  // Add optimistic updates for pending commands
  const optimisticMessages = connectionMetadata.pendingCommands
    .filter((c) => c.type === "add-message")
    .map((c) => c.message);

  return {
    messages: [...state.messages, ...optimisticMessages],
    isRunning: connectionMetadata.isSending || false,
  };
};

export function MyRuntimeProvider({ children }) {
  const runtime = useAssistantTransportRuntime({
    initialState: {
      messages: [],
    },
    api: "http://localhost:8010/assistant",
    converter,
    headers: async () => ({
      "Authorization": "Bearer token",
    }),
    body: {
      "custom-field": "custom-value",
    },
    onResponse: (response) => {
      console.log("Response received from server");
    },
    onFinish: () => {
      console.log("Conversation completed");
    },
    onError: (error, { commands, updateState }) => {
      console.error("Assistant transport error:", error);
      console.log("Commands in transit:", commands);
    },
    onCancel: ({ commands, updateState }) => {
      console.log("Request cancelled");
      console.log("Commands (in-transit + queued, or queued-only if called after error):", commands);
    },
  });

  return (
    <AssistantRuntimeProvider runtime={runtime}>
      {children}
    </AssistantRuntimeProvider>
  );
}
"use client";

import {
  AssistantRuntimeProvider,
  AssistantTransportConnectionMetadata,
  unstable_createMessageConverter as createMessageConverter,
  useAssistantTransportRuntime,
} from "@assistant-ui/react";
import {
  convertLangChainMessages,
  LangChainMessage,
} from "@assistant-ui/react-langgraph";

type State = {
  messages: LangChainMessage[];
};

const LangChainMessageConverter = createMessageConverter(
  convertLangChainMessages,
);

// Converter function: transforms agent state to assistant-ui format
const converter = (
  state: State,
  connectionMetadata: AssistantTransportConnectionMetadata,
) => {
  // Add optimistic updates for pending commands
  const optimisticStateMessages = connectionMetadata.pendingCommands.map(
    (c): LangChainMessage[] => {
      if (c.type === "add-message") {
        return [
          {
            type: "human" as const,
            content: [
              {
                type: "text" as const,
                text: c.message.parts
                  .map((p) => (p.type === "text" ? p.text : ""))
                  .join("\n"),
              },
            ],
          },
        ];
      }
      return [];
    },
  );

  const messages = [...state.messages, ...optimisticStateMessages.flat()];

  return {
    messages: LangChainMessageConverter.toThreadMessages(messages),
    isRunning: connectionMetadata.isSending || false,
  };
};

export function MyRuntimeProvider({ children }) {
  const runtime = useAssistantTransportRuntime({
    initialState: {
      messages: [],
    },
    api: "http://localhost:8010/assistant",
    converter,
    headers: async () => ({
      "Authorization": "Bearer token",
    }),
    body: {
      "custom-field": "custom-value",
    },
    onResponse: (response) => {
      console.log("Response received from server");
    },
    onFinish: () => {
      console.log("Conversation completed");
    },
    onError: (error, { commands, updateState }) => {
      console.error("Assistant transport error:", error);
      console.log("Commands in transit:", commands);
    },
    onCancel: ({ commands, updateState }) => {
      console.log("Request cancelled");
      console.log("Commands (in-transit + queued, or queued-only if called after error):", commands);
    },
  });

  return (
    <AssistantRuntimeProvider runtime={runtime}>
      {children}
    </AssistantRuntimeProvider>
  );
}

Full example: examples/with-assistant-transport

Custom Commands#

Defining Custom Commands#

Use module augmentation to define a custom command:

import "@assistant-ui/react";

declare module "@assistant-ui/react" {
  namespace Assistant {
    interface Commands {
      myCustomCommand: {
        type: "my-custom-command";
        data: string;
      };
    }
  }
}

Issuing Commands#

Use the useAssistantTransportSendCommand hook to send custom commands:

import { useAssistantTransportSendCommand } from "@assistant-ui/react";

function MyComponent() {
  const sendCommand = useAssistantTransportSendCommand();

  const handleClick = () => {
    sendCommand({
      type: "my-custom-command",
      data: "Hello, world!",
    });
  };

  return <button onClick={handleClick}>Send Custom Command</button>;
}

Backend Integration#

The backend receives custom commands in the commands array, just like built-in commands:

for command in request.commands:
    if command.type == "add-message":
        # Handle add-message command
    elif command.type == "add-tool-result":
        # Handle add-tool-result command
    elif command.type == "my-custom-command":
        # Handle your custom command
        data = command.data

Optimistic Updates#

Update the state converter to optimistically handle the custom command:

const converter = (state: State, connectionMetadata: ConnectionMetadata) => {
  // Filter custom commands from pending commands
  const customCommands = connectionMetadata.pendingCommands.filter(
    (c) => c.type === "my-custom-command"
  );

  // Apply optimistic updates based on custom commands
  const optimisticState = {
    ...state,
    customData: customCommands.map((c) => c.data),
  };

  return {
    messages: state.messages,
    state: optimisticState,
    isRunning: connectionMetadata.isSending || false,
  };
};

Cancellation and Error Behavior#

Custom commands follow the same lifecycle as built-in commands. You can update your onError and onCancel handlers to take custom commands into account:

const runtime = useAssistantTransportRuntime({
  // ... other options
  onError: (error, { commands, updateState }) => {
    // Check if any custom commands were in transit
    const customCommands = commands.filter((c) => c.type === "my-custom-command");

    if (customCommands.length > 0) {
      // Handle custom command errors
      updateState((state) => ({
        ...state,
        customCommandFailed: true,
      }));
    }
  },
  onCancel: ({ commands, updateState }) => {
    // Check if any custom commands were queued or in transit
    const customCommands = commands.filter((c) => c.type === "my-custom-command");

    if (customCommands.length > 0) {
      // Handle custom command cancellation
      updateState((state) => ({
        ...state,
        customCommandCancelled: true,
      }));
    }
  },
});