Skip to main content

Overview

Ingestion pipelines transform raw documents into indexed, queryable data through a series of transformations like parsing, chunking, and embedding.

IngestionPipeline

Pipeline for processing documents with transformations.
import { IngestionPipeline } from "llamaindex/ingestion";
import { Document } from "@llamaindex/core/schema";
import { SentenceSplitter } from "llamaindex/node-parser";
import { OpenAIEmbedding } from "@llamaindex/openai";

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter({ chunkSize: 512 }),
    new OpenAIEmbedding()
  ]
});

const documents = [
  new Document({ text: "Document content..." })
];

const nodes = await pipeline.run({ documents });

Constructor Options

transformations
TransformComponent[]
required
Array of transformation components to apply
vectorStore
VectorStore
Optional vector store to persist nodes
cache
IngestionCache
Optional cache for deduplication
docStore
BaseDocumentStore
Optional document store for persistence

Transformation Pipeline

Common pipeline pattern:
import {
  IngestionPipeline,
  SentenceSplitter,
  TitleExtractor,
  SummaryExtractor
} from "llamaindex";
import { OpenAIEmbedding } from "@llamaindex/openai";

const pipeline = new IngestionPipeline({
  transformations: [
    // 1. Split into chunks
    new SentenceSplitter({ chunkSize: 1024, chunkOverlap: 200 }),
    
    // 2. Extract metadata
    new TitleExtractor(),
    new SummaryExtractor(),
    
    // 3. Generate embeddings
    new OpenAIEmbedding({ model: "text-embedding-3-small" })
  ]
});

Running the Pipeline

Basic Run

const nodes = await pipeline.run({
  documents: documents
});

console.log(nodes.length); // Number of processed nodes
console.log(nodes[0].embedding); // Embedding vector

With Vector Store

import { PineconeVectorStore } from "@llamaindex/pinecone";

const vectorStore = new PineconeVectorStore({ indexName: "my-index" });

const pipeline = new IngestionPipeline({
  transformations: [splitter, embedModel],
  vectorStore: vectorStore
});

// Nodes are automatically persisted to vector store
await pipeline.run({ documents });

Incremental Ingestion

Cache nodes to avoid reprocessing:
import { SimpleCache } from "llamaindex";

const cache = new SimpleCache();

const pipeline = new IngestionPipeline({
  transformations: [splitter, embedModel],
  cache: cache
});

// First run - processes all documents
await pipeline.run({ documents: batch1 });

// Second run - only processes new documents
await pipeline.run({ documents: batch2 });

Metadata Extractors

Enrich nodes with extracted metadata:

TitleExtractor

Extract titles from content:
import { TitleExtractor } from "llamaindex/extractors";

const titleExtractor = new TitleExtractor({
  llm: llm,
  nodes: 5  // Extract from first 5 nodes
});

SummaryExtractor

Generate summaries:
import { SummaryExtractor } from "llamaindex/extractors";

const summaryExtractor = new SummaryExtractor({
  llm: llm,
  summaries: ["self"]  // Summary types
});

QuestionsAnsweredExtractor

Extract potential questions:
import { QuestionsAnsweredExtractor } from "llamaindex/extractors";

const qaExtractor = new QuestionsAnsweredExtractor({
  llm: llm,
  questions: 3  // Number of questions per node
});

KeywordExtractor

Extract keywords:
import { KeywordExtractor } from "llamaindex/extractors";

const keywordExtractor = new KeywordExtractor({
  llm: llm,
  keywords: 10  // Number of keywords
});

Custom Transformations

Create custom transformation components:
import { TransformComponent } from "@llamaindex/core/schema";
import type { BaseNode } from "@llamaindex/core/schema";

class CustomTransform extends TransformComponent<Promise<BaseNode[]>> {
  async transform(nodes: BaseNode[]): Promise<BaseNode[]> {
    return nodes.map(node => {
      // Add custom metadata
      node.metadata = {
        ...node.metadata,
        processed: true,
        timestamp: new Date().toISOString()
      };
      
      // Modify text
      node.text = node.text.toLowerCase();
      
      return node;
    });
  }
}

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter(),
    new CustomTransform(),
    new OpenAIEmbedding()
  ]
});

Document Readers

Load documents from various sources:

SimpleDirectoryReader

import { SimpleDirectoryReader } from "llamaindex";

const reader = new SimpleDirectoryReader();
const documents = await reader.loadData("./docs");

await pipeline.run({ documents });

LlamaParseReader

import { LlamaParseReader } from "@llamaindex/cloud";

const reader = new LlamaParseReader({
  apiKey: process.env.LLAMA_CLOUD_API_KEY
});

const documents = await reader.loadData("document.pdf");

Parallel Processing

Process documents in parallel:
const pipeline = new IngestionPipeline({
  transformations: [splitter, embedModel],
  numWorkers: 4  // Process in parallel
});

Error Handling

try {
  const nodes = await pipeline.run({ documents });
} catch (error) {
  if (error instanceof IngestionError) {
    console.error("Failed to ingest documents:", error.message);
    console.error("Failed documents:", error.failedDocuments);
  }
}

Progress Tracking

const pipeline = new IngestionPipeline({
  transformations: [splitter, embedModel],
  onProgress: (current, total) => {
    console.log(`Progress: ${current}/${total} (${Math.round(current/total*100)}%)`);
  }
});

Complete Example

import {
  IngestionPipeline,
  SimpleDirectoryReader,
  SentenceSplitter,
  TitleExtractor,
  SummaryExtractor,
  VectorStoreIndex
} from "llamaindex";
import { OpenAIEmbedding } from "@llamaindex/openai";
import { PineconeVectorStore } from "@llamaindex/pinecone";

// 1. Load documents
const reader = new SimpleDirectoryReader();
const documents = await reader.loadData("./docs");

// 2. Configure pipeline
const vectorStore = new PineconeVectorStore({ indexName: "docs" });

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter({ chunkSize: 512, chunkOverlap: 50 }),
    new TitleExtractor({ llm }),
    new SummaryExtractor({ llm }),
    new OpenAIEmbedding({ model: "text-embedding-3-small" })
  ],
  vectorStore: vectorStore
});

// 3. Run pipeline
const nodes = await pipeline.run({ documents });

console.log(`Processed ${nodes.length} nodes`);

// 4. Create index from vector store
const index = await VectorStoreIndex.fromVectorStore(vectorStore);

// 5. Query
const queryEngine = index.asQueryEngine();
const response = await queryEngine.query({
  query: "What is the main topic?"
});

console.log(response.response);

Best Practices

  1. Order transformations correctly: Parse → Extract metadata → Embed
  2. Use appropriate chunk sizes: 512-1024 for most use cases
  3. Cache for large datasets: Avoid reprocessing unchanged documents
  4. Extract relevant metadata: Improves retrieval quality
  5. Parallelize when possible: Speed up processing of large document sets
  6. Persist to vector stores: Enable distributed and scalable storage
  7. Monitor progress: Track ingestion for large batches

See Also