Skip to main content

Overview

Workflows provide a powerful, event-driven framework for building agentic applications. They replace the deprecated agent implementations with a more flexible and composable approach.

Installation

npm install @llamaindex/workflow

Basic Workflow

import { Workflow, StartEvent, StopEvent, Context } from "@llamaindex/workflow";

class MyWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    console.log("Workflow started with:", ev.data);
    
    // Your workflow logic here
    const result = await this.processData(ev.data);
    
    return new StopEvent({ result });
  }
  
  private async processData(data: any) {
    // Process data
    return `Processed: ${data}`;
  }
}

const workflow = new MyWorkflow();
const result = await workflow.run({ data: "Hello" });
console.log(result.result);

Events

Workflows are event-driven:

StartEvent

Initiates the workflow:
class StartEvent extends Event {
  constructor(public data?: any) {
    super();
  }
}

StopEvent

Terminates the workflow:
class StopEvent extends Event {
  constructor(public result?: any) {
    super();
  }
}

Custom Events

Define custom events for workflow steps:
import { zodEvent } from "@llamaindex/workflow";
import { z } from "zod";

const DataProcessedEvent = zodEvent("DataProcessed", {
  data: z.string(),
  metadata: z.object({
    timestamp: z.string()
  })
});

class MyWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Emit custom event
    ctx.emit(new DataProcessedEvent({
      data: "processed data",
      metadata: { timestamp: new Date().toISOString() }
    }));
    
    return new StopEvent({ result: "done" });
  }
}

Context

Manage workflow state with Context:
class MyWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Store state
    ctx.set("step", 1);
    ctx.set("data", ev.data);
    
    // Retrieve state
    const step = ctx.get("step");
    const data = ctx.get("data");
    
    // Update state
    ctx.set("step", step + 1);
    
    return new StopEvent({ result: data });
  }
}

Streaming

Stream events from workflows:
import { StreamingWorkflow } from "@llamaindex/workflow";

class StreamingWorkflow extends Workflow {
  async *run(ctx: Context, ev: StartEvent) {
    for (let i = 0; i < 5; i++) {
      yield { step: i, message: `Processing ${i}` };
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
    
    return new StopEvent({ result: "completed" });
  }
}

const workflow = new StreamingWorkflow();

for await (const event of workflow.run({ data: "start" })) {
  console.log(event);
}

Agent Workflow

Build agents with workflows:
import { Workflow, StartEvent, StopEvent, Context } from "@llamaindex/workflow";
import { OpenAI } from "@llamaindex/openai";
import { tool } from "@llamaindex/core/tools";
import { z } from "zod";

const searchTool = tool({
  name: "search",
  description: "Search for information",
  parameters: z.object({
    query: z.string()
  }),
  execute: async ({ query }) => {
    return `Results for: ${query}`;
  }
});

class AgentWorkflow extends Workflow {
  private llm = new OpenAI({ model: "gpt-4" });
  private tools = [searchTool];
  
  async run(ctx: Context, ev: StartEvent) {
    const { message } = ev;
    
    // Store conversation history
    const history = ctx.get("history") || [];
    history.push({ role: "user", content: message });
    
    // Call LLM with tools
    const response = await this.llm.chat({
      messages: history,
      tools: this.tools
    });
    
    // Handle tool calls
    if (response.message.options?.toolCall) {
      for (const toolCall of response.message.options.toolCall) {
        const tool = this.tools.find(t => t.metadata.name === toolCall.name);
        if (tool && tool.call) {
          const result = await tool.call(JSON.parse(toolCall.input));
          
          // Add tool result to history
          history.push(response.message);
          history.push({
            role: "user",
            content: String(result),
            options: { toolResult: { id: toolCall.id, result: String(result), isError: false } }
          });
        }
      }
      
      // Get final response
      const finalResponse = await this.llm.chat({ messages: history });
      history.push(finalResponse.message);
      
      ctx.set("history", history);
      return new StopEvent({ result: finalResponse.message.content });
    }
    
    history.push(response.message);
    ctx.set("history", history);
    
    return new StopEvent({ result: response.message.content });
  }
}

const agent = new AgentWorkflow();
const result = await agent.run({ message: "Search for AI news" });
console.log(result.result);

Multi-Step Workflows

Chain multiple steps:
import { zodEvent } from "@llamaindex/workflow";
import { z } from "zod";

const Step1Event = zodEvent("Step1", { data: z.string() });
const Step2Event = zodEvent("Step2", { data: z.string() });

class MultiStepWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Step 1
    const step1Result = await this.step1(ev.data);
    ctx.emit(new Step1Event({ data: step1Result }));
    
    // Step 2
    const step2Result = await this.step2(step1Result);
    ctx.emit(new Step2Event({ data: step2Result }));
    
    return new StopEvent({ result: step2Result });
  }
  
  private async step1(data: any) {
    return `Step 1: ${data}`;
  }
  
  private async step2(data: any) {
    return `Step 2: ${data}`;
  }
}

Best Practices

  1. Use events for communication: Define clear event types
  2. Manage state with Context: Store intermediate results
  3. Stream for long operations: Better UX for lengthy workflows
  4. Handle errors gracefully: Implement error events and handling
  5. Compose workflows: Build complex workflows from simpler ones
  6. Type your events: Use Zod schemas for type safety

See Also