Skip to main content
The IngestionPipeline orchestrates the complete flow of data from raw documents to indexed, searchable nodes. It handles parsing, transformation, embedding, and storage with built-in caching.

Overview

An ingestion pipeline:
  • Transforms documents through a series of steps
  • Caches intermediate results for efficiency
  • Handles embedding generation
  • Stores nodes in vector stores
  • Manages document updates and deduplication

Basic Usage

import { 
  Document, 
  IngestionPipeline, 
  SentenceSplitter 
} from "llamaindex";
import { OpenAIEmbedding } from "@llamaindex/openai";

const document = new Document({
  text: "Your document text here...",
  id_: "doc_1"
});

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter({ chunkSize: 1024, chunkOverlap: 20 }),
    new OpenAIEmbedding()
  ]
});

const nodes = await pipeline.run({ 
  documents: [document] 
});

console.log(`Generated ${nodes.length} nodes with embeddings`);

Pipeline Components

Transformations

Transformations are the processing steps applied to your data:
import { 
  IngestionPipeline,
  SentenceSplitter,
  MarkdownNodeParser
} from "llamaindex";
import { OpenAIEmbedding } from "@llamaindex/openai";

const pipeline = new IngestionPipeline({
  transformations: [
    // 1. Parse documents into nodes
    new SentenceSplitter({ 
      chunkSize: 512, 
      chunkOverlap: 50 
    }),
    
    // 2. Generate embeddings
    new OpenAIEmbedding({ 
      model: "text-embedding-3-small" 
    })
  ]
});

Vector Store Integration

Automatically store nodes in a vector store:
import { IngestionPipeline } from "llamaindex";
import { PineconeVectorStore } from "@llamaindex/pinecone";
import { OpenAIEmbedding } from "@llamaindex/openai";

const vectorStore = new PineconeVectorStore({
  indexName: "my-index"
});

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter({ chunkSize: 1024 }),
    new OpenAIEmbedding()
  ],
  vectorStore: vectorStore
});

// Nodes are automatically added to the vector store
await pipeline.run({ documents });

Multiple Vector Stores

Use different vector stores for different modalities:
import { ModalityType } from "llamaindex";

const pipeline = new IngestionPipeline({
  transformations: [...],
  vectorStores: {
    [ModalityType.TEXT]: textVectorStore,
    [ModalityType.IMAGE]: imageVectorStore
  }
});

Caching

Pipelines cache transformation results to avoid reprocessing:
import { IngestionPipeline, IngestionCache } from "llamaindex";

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter({ chunkSize: 1024 }),
    new OpenAIEmbedding()
  ],
  // Caching is enabled by default
  disableCache: false
});

// First run: processes everything
const nodes1 = await pipeline.run({ documents });

// Second run with same documents: uses cache
const nodes2 = await pipeline.run({ documents });

// To disable caching:
const noCachePipeline = new IngestionPipeline({
  transformations: [...],
  disableCache: true
});

Custom Cache

const cache = new IngestionCache("my_custom_collection");

const pipeline = new IngestionPipeline({
  transformations: [...],
  cache: cache
});

How Caching Works

  1. A hash is computed from the input nodes and transformation configuration
  2. If cached results exist for this hash, they’re returned immediately
  3. Otherwise, the transformation runs and results are cached
  4. Cache is stored in-memory by default

Document Store Strategies

Control how documents are managed and deduplicated:
import { 
  IngestionPipeline, 
  DocStoreStrategy 
} from "llamaindex";
import { SimpleDocumentStore } from "llamaindex/storage";

const docStore = new SimpleDocumentStore();

const pipeline = new IngestionPipeline({
  transformations: [...],
  docStore: docStore,
  docStoreStrategy: DocStoreStrategy.UPSERTS
});

Available Strategies

  • DocStoreStrategy.UPSERTS (default): Update existing docs, insert new ones
  • DocStoreStrategy.DUPLICATES_ONLY: Skip duplicate documents
  • DocStoreStrategy.UPSERTS_AND_DELETE: Handle updates and deletions
  • DocStoreStrategy.NONE: No document store management

Input Sources

Pipelines accept multiple input sources:

Direct Documents

const nodes = await pipeline.run({ 
  documents: [doc1, doc2, doc3] 
});

Existing Nodes

const nodes = await pipeline.run({ 
  nodes: existingNodes 
});

Reader Integration

import { SimpleDirectoryReader } from "@llamaindex/readers/directory";

const pipeline = new IngestionPipeline({
  transformations: [...],
  reader: new SimpleDirectoryReader()
});

// Reader is automatically called
const nodes = await pipeline.run();

Pipeline Documents

const pipeline = new IngestionPipeline({
  transformations: [...],
  documents: [doc1, doc2]  // Set at pipeline creation
});

const nodes = await pipeline.run();

Complete Example

import fs from "fs/promises";
import { 
  Document,
  IngestionPipeline,
  SentenceSplitter,
  VectorStoreIndex,
  DocStoreStrategy
} from "llamaindex";
import { OpenAIEmbedding } from "@llamaindex/openai";
import { PineconeVectorStore } from "@llamaindex/pinecone";
import { SimpleDocumentStore } from "llamaindex/storage";

async function main() {
  // Load documents
  const essay = await fs.readFile("essay.txt", "utf-8");
  const document = new Document({ 
    text: essay, 
    id_: "essay.txt",
    metadata: {
      source: "essay.txt",
      author: "Paul Graham"
    }
  });
  
  // Set up storage
  const vectorStore = new PineconeVectorStore({
    indexName: "my-index"
  });
  
  const docStore = new SimpleDocumentStore();
  
  // Create pipeline
  const pipeline = new IngestionPipeline({
    transformations: [
      // Split into chunks
      new SentenceSplitter({ 
        chunkSize: 1024, 
        chunkOverlap: 200 
      }),
      
      // Generate embeddings
      new OpenAIEmbedding({ 
        model: "text-embedding-3-small" 
      })
    ],
    vectorStore: vectorStore,
    docStore: docStore,
    docStoreStrategy: DocStoreStrategy.UPSERTS
  });
  
  console.time("Pipeline Run Time");
  
  // Run pipeline
  const nodes = await pipeline.run({ 
    documents: [document] 
  });
  
  console.timeEnd("Pipeline Run Time");
  console.log(`Processed ${nodes.length} nodes`);
  
  // Create index from vector store
  const index = await VectorStoreIndex.fromVectorStore(vectorStore);
  
  // Query
  const queryEngine = index.asQueryEngine();
  const response = await queryEngine.query({
    query: "What is the main topic?"
  });
  
  console.log(response.toString());
}

main().catch(console.error);

Advanced Usage

Custom Transformations

Create custom transformation components:
import { TransformComponent, BaseNode } from "llamaindex";

const customTransform = new TransformComponent(
  async (nodes: BaseNode[]) => {
    // Your custom transformation logic
    return nodes.map(node => {
      // Modify node
      node.metadata.processed = true;
      return node;
    });
  }
);

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter({ chunkSize: 1024 }),
    customTransform,
    new OpenAIEmbedding()
  ]
});

Running Transformations Independently

import { runTransformations } from "llamaindex";

const transformations = [
  new SentenceSplitter({ chunkSize: 1024 }),
  new OpenAIEmbedding()
];

const nodes = await runTransformations(
  documents,
  transformations,
  {}, // transform options
  { 
    inPlace: false,  // Don't modify original array
    cache: myCache,
    docStoreStrategy: myStrategy
  }
);

Batch Processing

import { SimpleDirectoryReader } from "@llamaindex/readers/directory";

// Process directory of files
const reader = new SimpleDirectoryReader();
const documents = await reader.loadData("./documents");

console.log(`Loaded ${documents.length} documents`);

const pipeline = new IngestionPipeline({
  transformations: [
    new SentenceSplitter({ chunkSize: 1024 }),
    new OpenAIEmbedding()
  ],
  vectorStore: vectorStore
});

// Process all documents
const nodes = await pipeline.run({ documents });

console.log(`Created ${nodes.length} total nodes`);

Performance Tips

  1. Enable caching for repeated runs with same documents
  2. Use appropriate chunk sizes to balance quality and quantity
  3. Batch documents when processing multiple files
  4. Monitor token usage with embedding models
  5. Use document stores to avoid reprocessing unchanged documents

Error Handling

try {
  const nodes = await pipeline.run({ documents });
} catch (error) {
  if (error.message.includes("Metadata length")) {
    console.error("Metadata too large for chunk size");
    // Reduce metadata or increase chunk size
  } else if (error.message.includes("API")) {
    console.error("Embedding API error:", error);
    // Handle API errors
  } else {
    throw error;
  }
}

Best Practices

  1. Choose appropriate transformations
    • Text splitting for long documents
    • Embeddings for semantic search
    • Custom extractors for metadata
  2. Configure caching wisely
    • Enable for development and repeated runs
    • Disable for production one-time ingestion
  3. Use document stores
    • Track which documents have been processed
    • Avoid reprocessing unchanged content
    • Enable incremental updates
  4. Monitor pipeline performance
    • Log processing times
    • Track node counts
    • Watch for errors in transformations
  5. Handle large datasets
    • Process in batches
    • Use streaming when possible
    • Monitor memory usage

Next Steps

Node Parsers

Configure text splitting strategies

Embeddings

Choose and configure embedding models

Vector Stores

Set up vector storage backends

Storage

Manage document and index stores