AI Agents

Resumable Streams

When building chat interfaces, network interruptions or page refreshes can break the connection to an in-progress agent. Workflow DevKit provides WorkflowChatTransport, a drop-in transport for the AI SDK that enables automatic stream reconnection.

The Problem

A typical chat application loses state when:

  • The user refreshes the page during a response
  • Network connectivity drops temporarily
  • Serverless function timeouts occur mid-stream

With a standard chat implementation, users must resend their message and wait for the entire response again.

The Solution

WorkflowChatTransport solves this by:

  1. Tracking the workflow run ID returned from your API
  2. Automatically reconnecting to the stream from the last received chunk
  3. Storing the run ID for session resumption across page loads

Implementation

Step 1: Return the Run ID from Your API

Modify your chat endpoint to include the workflow run ID in a response header:

app/api/chat/route.ts
import { createUIMessageStreamResponse, convertToModelMessages } from 'ai';
import { start } from 'workflow/api';
import { chatWorkflow } from './workflow';

export async function POST(req: Request) {
  const { messages, modelId } = await req.json();
  const modelMessages = convertToModelMessages(messages);

  const run = await start(chatWorkflow, [{ messages: modelMessages, modelId }]);

  return createUIMessageStreamResponse({
    stream: run.readable,
    headers: {
      'x-workflow-run-id': run.runId, 
    },
  });
}

Step 2: Add a Stream Reconnection Endpoint

Create an endpoint that returns the stream for an existing run:

app/api/chat/[id]/stream/route.ts
import { createUIMessageStreamResponse } from 'ai';
import { getRun } from 'workflow/api'; 

export async function GET(
  request: Request,
  { params }: { params: Promise<{ id: string }> }
) {
  const { id } = await params;
  const { searchParams } = new URL(request.url);

  // Client provides the last chunk index they received
  const startIndexParam = searchParams.get('startIndex'); 
  const startIndex = startIndexParam
    ? parseInt(startIndexParam, 10)
    : undefined;

  const run = getRun(id); 
  const stream = run.getReadable({ startIndex }); 

  return createUIMessageStreamResponse({ stream });
}

The startIndex parameter ensures the client only receives chunks it missed, avoiding duplicate data.

Step 3: Use WorkflowChatTransport in the Client

Replace the default transport in useChat with WorkflowChatTransport:

app/chat.tsx
'use client';

import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai'; 
import { useMemo, useState } from 'react';

export function Chat() {
  const [input, setInput] = useState('');

  // Check for an active workflow run on mount
  const activeRunId = useMemo(() => { 
    if (typeof window === 'undefined') return; 
    return localStorage.getItem('active-workflow-run-id') ?? undefined; 
  }, []); 

  const { messages, sendMessage, status } = useChat({
    resume: Boolean(activeRunId), 
    transport: new WorkflowChatTransport({ 
      api: '/api/chat',

      // Store the run ID when a new chat starts
      onChatSendMessage: (response, options) => { 
        const workflowRunId = response.headers.get('x-workflow-run-id'); 
        if (workflowRunId) { 
          localStorage.setItem('active-workflow-run-id', workflowRunId); 
        } 
      }, 

      // Clear the run ID when the chat completes
      onChatEnd: () => { 
        localStorage.removeItem('active-workflow-run-id'); 
      }, 

      // Use the stored run ID for reconnection
      prepareReconnectToStreamRequest: ({ api, ...rest }) => { 
        const runId = localStorage.getItem('active-workflow-run-id'); 
        if (!runId) throw new Error('No active workflow run ID found'); 
        return { 
          ...rest, 
          api: `/api/chat/${encodeURIComponent(runId)}/stream`, 
        }; 
      }, 

      maxConsecutiveErrors: 5,
    }), 
  });

  return (
    <div>
      {messages.map((m) => (
        <div key={m.id}>
          <strong>{m.role}:</strong> {m.content}
        </div>
      ))}
      <form
        onSubmit={(e) => {
          e.preventDefault();
          sendMessage({ text: input });
          setInput('');
        }}
      >
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="Type a message..."
        />
      </form>
    </div>
  );
}

How It Works

  1. When the user sends a message, WorkflowChatTransport makes a POST to /api/chat
  2. The API starts a workflow and returns the run ID in the x-workflow-run-id header
  3. onChatSendMessage stores this run ID in localStorage
  4. If the stream is interrupted before receiving a "finish" chunk, the transport automatically reconnects
  5. prepareReconnectToStreamRequest builds the reconnection URL using the stored run ID
  6. The reconnection endpoint returns the stream from where the client left off
  7. When the stream completes, onChatEnd clears the stored run ID

Handling Page Refreshes

The resume option tells useChat to attempt reconnection on mount. Combined with localStorage persistence, this enables seamless recovery:

const activeRunId = useMemo(() => {
  if (typeof window === 'undefined') return;
  return localStorage.getItem('active-workflow-run-id') ?? undefined;
}, []);

const { messages, sendMessage } = useChat({
  resume: Boolean(activeRunId), // Attempt to resume if a run ID exists
  transport: new WorkflowChatTransport({ /* ... */ }),
});

When the user refreshes the page:

  1. The component checks localStorage for an active run ID
  2. If found, resume: true triggers a reconnection attempt
  3. The transport fetches the stream from the last known position
  4. Messages continue appearing where they left off

Error Handling

Configure retry behavior with maxConsecutiveErrors:

new WorkflowChatTransport({
  maxConsecutiveErrors: 5, // Give up after 5 consecutive failures
  // ...
})

The transport automatically retries reconnection on network errors. After reaching the limit, it stops attempting and surfaces the error.