Overview
Ingestion pipelines transform raw documents into indexed, queryable data through a series of transformations like parsing, chunking, and embedding.
IngestionPipeline
Pipeline for processing documents with transformations.
import { IngestionPipeline } from "llamaindex/ingestion";
import { Document } from "@llamaindex/core/schema";
import { SentenceSplitter } from "llamaindex/node-parser";
import { OpenAIEmbedding } from "@llamaindex/openai";
const pipeline = new IngestionPipeline({
transformations: [
new SentenceSplitter({ chunkSize: 512 }),
new OpenAIEmbedding()
]
});
const documents = [
new Document({ text: "Document content..." })
];
const nodes = await pipeline.run({ documents });
Constructor Options
transformations
TransformComponent[]
required
Array of transformation components to apply
Optional vector store to persist nodes
Optional cache for deduplication
Optional document store for persistence
Common pipeline pattern:
import {
IngestionPipeline,
SentenceSplitter,
TitleExtractor,
SummaryExtractor
} from "llamaindex";
import { OpenAIEmbedding } from "@llamaindex/openai";
const pipeline = new IngestionPipeline({
transformations: [
// 1. Split into chunks
new SentenceSplitter({ chunkSize: 1024, chunkOverlap: 200 }),
// 2. Extract metadata
new TitleExtractor(),
new SummaryExtractor(),
// 3. Generate embeddings
new OpenAIEmbedding({ model: "text-embedding-3-small" })
]
});
Running the Pipeline
Basic Run
const nodes = await pipeline.run({
documents: documents
});
console.log(nodes.length); // Number of processed nodes
console.log(nodes[0].embedding); // Embedding vector
With Vector Store
import { PineconeVectorStore } from "@llamaindex/pinecone";
const vectorStore = new PineconeVectorStore({ indexName: "my-index" });
const pipeline = new IngestionPipeline({
transformations: [splitter, embedModel],
vectorStore: vectorStore
});
// Nodes are automatically persisted to vector store
await pipeline.run({ documents });
Incremental Ingestion
Cache nodes to avoid reprocessing:
import { SimpleCache } from "llamaindex";
const cache = new SimpleCache();
const pipeline = new IngestionPipeline({
transformations: [splitter, embedModel],
cache: cache
});
// First run - processes all documents
await pipeline.run({ documents: batch1 });
// Second run - only processes new documents
await pipeline.run({ documents: batch2 });
Enrich nodes with extracted metadata:
Extract titles from content:
import { TitleExtractor } from "llamaindex/extractors";
const titleExtractor = new TitleExtractor({
llm: llm,
nodes: 5 // Extract from first 5 nodes
});
Generate summaries:
import { SummaryExtractor } from "llamaindex/extractors";
const summaryExtractor = new SummaryExtractor({
llm: llm,
summaries: ["self"] // Summary types
});
Extract potential questions:
import { QuestionsAnsweredExtractor } from "llamaindex/extractors";
const qaExtractor = new QuestionsAnsweredExtractor({
llm: llm,
questions: 3 // Number of questions per node
});
Extract keywords:
import { KeywordExtractor } from "llamaindex/extractors";
const keywordExtractor = new KeywordExtractor({
llm: llm,
keywords: 10 // Number of keywords
});
Create custom transformation components:
import { TransformComponent } from "@llamaindex/core/schema";
import type { BaseNode } from "@llamaindex/core/schema";
class CustomTransform extends TransformComponent<Promise<BaseNode[]>> {
async transform(nodes: BaseNode[]): Promise<BaseNode[]> {
return nodes.map(node => {
// Add custom metadata
node.metadata = {
...node.metadata,
processed: true,
timestamp: new Date().toISOString()
};
// Modify text
node.text = node.text.toLowerCase();
return node;
});
}
}
const pipeline = new IngestionPipeline({
transformations: [
new SentenceSplitter(),
new CustomTransform(),
new OpenAIEmbedding()
]
});
Document Readers
Load documents from various sources:
SimpleDirectoryReader
import { SimpleDirectoryReader } from "llamaindex";
const reader = new SimpleDirectoryReader();
const documents = await reader.loadData("./docs");
await pipeline.run({ documents });
LlamaParseReader
import { LlamaParseReader } from "@llamaindex/cloud";
const reader = new LlamaParseReader({
apiKey: process.env.LLAMA_CLOUD_API_KEY
});
const documents = await reader.loadData("document.pdf");
Parallel Processing
Process documents in parallel:
const pipeline = new IngestionPipeline({
transformations: [splitter, embedModel],
numWorkers: 4 // Process in parallel
});
Error Handling
try {
const nodes = await pipeline.run({ documents });
} catch (error) {
if (error instanceof IngestionError) {
console.error("Failed to ingest documents:", error.message);
console.error("Failed documents:", error.failedDocuments);
}
}
Progress Tracking
const pipeline = new IngestionPipeline({
transformations: [splitter, embedModel],
onProgress: (current, total) => {
console.log(`Progress: ${current}/${total} (${Math.round(current/total*100)}%)`);
}
});
Complete Example
import {
IngestionPipeline,
SimpleDirectoryReader,
SentenceSplitter,
TitleExtractor,
SummaryExtractor,
VectorStoreIndex
} from "llamaindex";
import { OpenAIEmbedding } from "@llamaindex/openai";
import { PineconeVectorStore } from "@llamaindex/pinecone";
// 1. Load documents
const reader = new SimpleDirectoryReader();
const documents = await reader.loadData("./docs");
// 2. Configure pipeline
const vectorStore = new PineconeVectorStore({ indexName: "docs" });
const pipeline = new IngestionPipeline({
transformations: [
new SentenceSplitter({ chunkSize: 512, chunkOverlap: 50 }),
new TitleExtractor({ llm }),
new SummaryExtractor({ llm }),
new OpenAIEmbedding({ model: "text-embedding-3-small" })
],
vectorStore: vectorStore
});
// 3. Run pipeline
const nodes = await pipeline.run({ documents });
console.log(`Processed ${nodes.length} nodes`);
// 4. Create index from vector store
const index = await VectorStoreIndex.fromVectorStore(vectorStore);
// 5. Query
const queryEngine = index.asQueryEngine();
const response = await queryEngine.query({
query: "What is the main topic?"
});
console.log(response.response);
Best Practices
- Order transformations correctly: Parse → Extract metadata → Embed
- Use appropriate chunk sizes: 512-1024 for most use cases
- Cache for large datasets: Avoid reprocessing unchanged documents
- Extract relevant metadata: Improves retrieval quality
- Parallelize when possible: Speed up processing of large document sets
- Persist to vector stores: Enable distributed and scalable storage
- Monitor progress: Track ingestion for large batches
See Also