Resumable Streams
When building chat interfaces, network interruptions or page refreshes can break the connection to an in-progress agent. Workflow DevKit provides WorkflowChatTransport, a drop-in transport for the AI SDK that enables automatic stream reconnection.
The Problem
A typical chat application loses state when:
- The user refreshes the page during a response
- Network connectivity drops temporarily
- Serverless function timeouts occur mid-stream
With a standard chat implementation, users must resend their message and wait for the entire response again.
The Solution
WorkflowChatTransport solves this by:
- Tracking the workflow run ID returned from your API
- Automatically reconnecting to the stream from the last received chunk
- Storing the run ID for session resumption across page loads
Implementation
Step 1: Return the Run ID from Your API
Modify your chat endpoint to include the workflow run ID in a response header:
import { createUIMessageStreamResponse, convertToModelMessages } from 'ai';
import { start } from 'workflow/api';
import { chatWorkflow } from './workflow';
export async function POST(req: Request) {
const { messages, modelId } = await req.json();
const modelMessages = convertToModelMessages(messages);
const run = await start(chatWorkflow, [{ messages: modelMessages, modelId }]);
return createUIMessageStreamResponse({
stream: run.readable,
headers: {
'x-workflow-run-id': run.runId,
},
});
}Step 2: Add a Stream Reconnection Endpoint
Create an endpoint that returns the stream for an existing run:
import { createUIMessageStreamResponse } from 'ai';
import { getRun } from 'workflow/api';
export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const { searchParams } = new URL(request.url);
// Client provides the last chunk index they received
const startIndexParam = searchParams.get('startIndex');
const startIndex = startIndexParam
? parseInt(startIndexParam, 10)
: undefined;
const run = getRun(id);
const stream = run.getReadable({ startIndex });
return createUIMessageStreamResponse({ stream });
}The startIndex parameter ensures the client only receives chunks it missed, avoiding duplicate data.
Step 3: Use WorkflowChatTransport in the Client
Replace the default transport in useChat with WorkflowChatTransport:
'use client';
import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai';
import { useMemo, useState } from 'react';
export function Chat() {
const [input, setInput] = useState('');
// Check for an active workflow run on mount
const activeRunId = useMemo(() => {
if (typeof window === 'undefined') return;
return localStorage.getItem('active-workflow-run-id') ?? undefined;
}, []);
const { messages, sendMessage, status } = useChat({
resume: Boolean(activeRunId),
transport: new WorkflowChatTransport({
api: '/api/chat',
// Store the run ID when a new chat starts
onChatSendMessage: (response, options) => {
const workflowRunId = response.headers.get('x-workflow-run-id');
if (workflowRunId) {
localStorage.setItem('active-workflow-run-id', workflowRunId);
}
},
// Clear the run ID when the chat completes
onChatEnd: () => {
localStorage.removeItem('active-workflow-run-id');
},
// Use the stored run ID for reconnection
prepareReconnectToStreamRequest: ({ api, ...rest }) => {
const runId = localStorage.getItem('active-workflow-run-id');
if (!runId) throw new Error('No active workflow run ID found');
return {
...rest,
api: `/api/chat/${encodeURIComponent(runId)}/stream`,
};
},
maxConsecutiveErrors: 5,
}),
});
return (
<div>
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
<form
onSubmit={(e) => {
e.preventDefault();
sendMessage({ text: input });
setInput('');
}}
>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type a message..."
/>
</form>
</div>
);
}How It Works
- When the user sends a message,
WorkflowChatTransportmakes a POST to/api/chat - The API starts a workflow and returns the run ID in the
x-workflow-run-idheader onChatSendMessagestores this run ID in localStorage- If the stream is interrupted before receiving a "finish" chunk, the transport automatically reconnects
prepareReconnectToStreamRequestbuilds the reconnection URL using the stored run ID- The reconnection endpoint returns the stream from where the client left off
- When the stream completes,
onChatEndclears the stored run ID
Handling Page Refreshes
The resume option tells useChat to attempt reconnection on mount. Combined with localStorage persistence, this enables seamless recovery:
const activeRunId = useMemo(() => {
if (typeof window === 'undefined') return;
return localStorage.getItem('active-workflow-run-id') ?? undefined;
}, []);
const { messages, sendMessage } = useChat({
resume: Boolean(activeRunId), // Attempt to resume if a run ID exists
transport: new WorkflowChatTransport({ /* ... */ }),
});When the user refreshes the page:
- The component checks localStorage for an active run ID
- If found,
resume: truetriggers a reconnection attempt - The transport fetches the stream from the last known position
- Messages continue appearing where they left off
Error Handling
Configure retry behavior with maxConsecutiveErrors:
new WorkflowChatTransport({
maxConsecutiveErrors: 5, // Give up after 5 consecutive failures
// ...
})The transport automatically retries reconnection on network errors. After reaching the limit, it stops attempting and surfaces the error.
Related Documentation
WorkflowChatTransportAPI Reference - Full configuration options- Streaming - Understanding workflow streams
getRun()API Reference - Retrieving existing runs