Documentation Index
Fetch the complete documentation index at: https://mintlify.com/run-llama/LlamaIndexTS/llms.txt
Use this file to discover all available pages before exploring further.
This example demonstrates how to create event-driven workflows for complex multi-step AI applications.
Overview
Workflows allow you to:
- Create event-driven architectures
- Manage state across multiple steps
- Build iterative refinement loops
- Compose complex AI pipelines
This example builds a joke refinement workflow that iteratively improves a joke based on LLM critique.
Complete Example
import { openai } from "@llamaindex/openai";
import {
createStatefulMiddleware,
createWorkflow,
workflowEvent,
} from "@llamaindex/workflow";
// Create LLM instance
const llm = openai({ model: "gpt-4.1-mini" });
// Define workflow events
const startEvent = workflowEvent<string>(); // Input topic
const jokeEvent = workflowEvent<{ joke: string }>(); // Generated joke
const critiqueEvent = workflowEvent<{ joke: string; critique: string }>(); // Critique
const resultEvent = workflowEvent<{ joke: string; critique: string }>(); // Final result
// Create stateful workflow
const { withState, getContext } = createStatefulMiddleware(() => ({
numIterations: 0,
maxIterations: 3,
}));
const jokeFlow = withState(createWorkflow());
// Handler 1: Generate initial joke
jokeFlow.handle([startEvent], async (context, event) => {
const prompt = `Write your best joke about ${event.data}. Write the joke between <joke> and </joke> tags.`;
const response = await llm.complete({ prompt });
const joke =
response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ??
response.text;
return jokeEvent.with({ joke });
});
// Handler 2: Critique the joke
jokeFlow.handle([jokeEvent], async (context, event) => {
const prompt = `Give a thorough critique of the following joke. If the joke needs improvement, put "IMPROVE" somewhere in the critique: ${event.data.joke}`;
const response = await llm.complete({ prompt });
// Check if improvement needed
if (response.text.includes("IMPROVE")) {
return critiqueEvent.with({
joke: event.data.joke,
critique: response.text,
});
}
return resultEvent.with({ joke: event.data.joke, critique: response.text });
});
// Handler 3: Improve the joke based on critique
jokeFlow.handle([critiqueEvent], async (context, event) => {
const state = context.state;
state.numIterations++;
const prompt = `Write a new joke based on the following critique and the original joke. Write the joke between <joke> and </joke> tags.\n\nJoke: ${event.data.joke}\n\nCritique: ${event.data.critique}`;
const response = await llm.complete({ prompt });
const joke =
response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ??
response.text;
// Check iteration limit
if (state.numIterations < state.maxIterations) {
return jokeEvent.with({ joke });
}
return resultEvent.with({ joke, critique: event.data.critique });
});
// Usage
async function main() {
const { stream, sendEvent } = jokeFlow.createContext();
sendEvent(startEvent.with("pirates"));
let result: { joke: string; critique: string } | undefined;
for await (const event of stream) {
if (resultEvent.include(event)) {
result = event.data;
break; // Stop when we get the final result
}
}
console.log(result);
}
main().catch(console.error);
Step-by-Step Explanation
1. Define Events
Events represent data flowing through your workflow:
import { workflowEvent } from "@llamaindex/workflow";
// Define typed events
const startEvent = workflowEvent<string>();
const jokeEvent = workflowEvent<{ joke: string }>();
const critiqueEvent = workflowEvent<{ joke: string; critique: string }>();
const resultEvent = workflowEvent<{ joke: string; critique: string }>();
Each event carries typed data through the workflow.
2. Create Stateful Workflow
Add state management to track workflow progress:
import { createStatefulMiddleware, createWorkflow } from "@llamaindex/workflow";
const { withState } = createStatefulMiddleware(() => ({
numIterations: 0,
maxIterations: 3,
}));
const jokeFlow = withState(createWorkflow());
3. Define Event Handlers
Handlers process events and emit new events:
// Handler: startEvent -> jokeEvent
jokeFlow.handle([startEvent], async (context, event) => {
// Process input
const topic = event.data;
// Generate output
const joke = await generateJoke(topic);
// Emit next event
return jokeEvent.with({ joke });
});
4. Conditional Routing
Route to different events based on conditions:
jokeFlow.handle([jokeEvent], async (context, event) => {
const critique = await critiqueJoke(event.data.joke);
if (critique.needsImprovement) {
// Continue refining
return critiqueEvent.with({
joke: event.data.joke,
critique: critique.text,
});
}
// Done, return result
return resultEvent.with({
joke: event.data.joke,
critique: critique.text,
});
});
5. Access State
Read and update workflow state:
jokeFlow.handle([critiqueEvent], async (context, event) => {
// Access state
const state = context.state;
state.numIterations++;
// Use state in logic
if (state.numIterations < state.maxIterations) {
return jokeEvent.with({ joke: improvedJoke });
}
return resultEvent.with({ joke: improvedJoke, critique });
});
6. Execute Workflow
const { stream, sendEvent } = jokeFlow.createContext();
// Start the workflow
sendEvent(startEvent.with("pirates"));
// Process events
for await (const event of stream) {
if (resultEvent.include(event)) {
console.log("Final result:", event.data);
break;
}
}
Workflow Patterns
Linear Pipeline
Sequential processing steps:
const step1Event = workflowEvent<Data1>();
const step2Event = workflowEvent<Data2>();
const step3Event = workflowEvent<Data3>();
workflow.handle([step1Event], async (ctx, event) => {
const result = await processStep1(event.data);
return step2Event.with(result);
});
workflow.handle([step2Event], async (ctx, event) => {
const result = await processStep2(event.data);
return step3Event.with(result);
});
Parallel Execution
Process multiple paths simultaneously:
const inputEvent = workflowEvent<Input>();
const pathAEvent = workflowEvent<ResultA>();
const pathBEvent = workflowEvent<ResultB>();
const mergeEvent = workflowEvent<Combined>();
workflow.handle([inputEvent], async (ctx, event) => {
// Trigger both paths
ctx.sendEvent(pathAEvent.with(event.data));
ctx.sendEvent(pathBEvent.with(event.data));
});
Iterative Refinement
Loop until condition met:
workflow.handle([processEvent], async (ctx, event) => {
const result = await process(event.data);
if (isGoodEnough(result)) {
return doneEvent.with(result);
}
// Refine and loop back
return processEvent.with(refine(result));
});
Error Handling
Handle errors gracefully:
const errorEvent = workflowEvent<Error>();
workflow.handle([processEvent], async (ctx, event) => {
try {
const result = await riskyOperation(event.data);
return successEvent.with(result);
} catch (error) {
return errorEvent.with(error);
}
});
workflow.handle([errorEvent], async (ctx, event) => {
console.error("Workflow error:", event.data);
// Recovery logic
});
Running the Example
- Install dependencies:
npm install @llamaindex/openai @llamaindex/workflow
- Set your API key:
export OPENAI_API_KEY="sk-..."
- Run the workflow:
Expected Output
The workflow will:
- Generate an initial joke about pirates
- Critique the joke
- If needed, improve the joke up to 3 times
- Return the final joke and critique
{
"joke": "Why couldn't the pirate play cards? Because he was standing on the deck!",
"critique": "This is a clever play on words. Well done!"
}
Advanced Features
Timeout Handling
const timeoutEvent = workflowEvent<void>();
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with(input));
const timeout = setTimeout(() => {
sendEvent(timeoutEvent.with(undefined));
}, 30000); // 30 second timeout
for await (const event of stream) {
if (resultEvent.include(event)) {
clearTimeout(timeout);
break;
}
if (timeoutEvent.include(event)) {
console.error("Workflow timeout");
break;
}
}
Workflow Composition
Combine multiple workflows:
const subWorkflow = createWorkflow();
// Define sub-workflow handlers...
const mainWorkflow = createWorkflow();
mainWorkflow.handle([triggerEvent], async (ctx, event) => {
const { stream, sendEvent } = subWorkflow.createContext();
sendEvent(subStartEvent.with(event.data));
for await (const subEvent of stream) {
if (subResultEvent.include(subEvent)) {
return nextEvent.with(subEvent.data);
}
}
});
Next Steps
Advanced Workflows
Explore the workflows-ts repository
Agents
Combine workflows with agents
RAG Pipeline
Build RAG ingestion workflows
Event Streaming
Stream workflow events to clients