world.streams

Read, write, and manage real-time data streams for workflow runs.

The world.streams interface (via the Streamer interface) provides low-level access to workflow data streams. Use it to write chunks, read streams, and manage stream lifecycle outside of the standard getWritable() pattern.

For most streaming use cases, use getWritable() inside steps. The world.streams interface is for advanced scenarios like building custom stream consumers or managing streams from outside a workflow.

Import

import { getWorld } from "workflow/runtime";

const world = getWorld();
const streams = world.streams; 

Methods

writeToStream()

Write data chunks to a named stream for a workflow run.

await world.streams.writeToStream("default", runId, chunk); 

Parameters:

ParameterTypeDescription
namestringThe stream name
runIdstringThe workflow run ID
chunkstring | Uint8ArrayData to write to the stream

readFromStream()

Read data from a named stream as a ReadableStream.

const readable = await world.streams.readFromStream("default"); 

Parameters:

ParameterTypeDescription
namestringThe stream name
startIndexnumberOptional starting index for partial reads

Returns: ReadableStream<Uint8Array>

closeStream()

Close a stream when done writing.

await world.streams.closeStream("default", runId); 

Parameters:

ParameterTypeDescription
namestringThe stream name
runIdstringThe workflow run ID

listStreamsByRunId()

List all stream names associated with a workflow run.

const streamNames = await world.streams.listStreamsByRunId(runId); 

Parameters:

ParameterTypeDescription
runIdstringThe workflow run ID

Returns: string[] — Array of stream names

Examples

List All Streams for a Workflow Run

// app/api/workflow-streams/route.ts
import { getWorld } from "workflow/runtime";

export async function GET(req: Request) {
  const url = new URL(req.url);
  const runId = url.searchParams.get("runId");

  if (!runId) {
    return Response.json({ error: "runId required" }, { status: 400 });
  }

  const world = getWorld();
  const streamNames = await world.streams.listStreamsByRunId(runId); 

  return Response.json({ streams: streamNames });
}

Read a Stream as a Response

// app/api/workflow-streams/read/route.ts
import { getWorld } from "workflow/runtime";

export async function GET(req: Request) {
  const url = new URL(req.url);
  const streamName = url.searchParams.get("name");

  if (!streamName) {
    return Response.json({ error: "name required" }, { status: 400 });
  }

  const world = getWorld();
  const readable = await world.streams.readFromStream(streamName); 

  return new Response(readable, {
    headers: { "Content-Type": "application/octet-stream" },
  });
}
  • Streaming — Core concepts for streaming data from workflows
  • getWritable() — The standard way to write to streams from within steps
  • world.runs — List runs that may have associated streams