Documentation Index Fetch the complete documentation index at: https://mintlify.com/run-llama/LlamaIndexTS/llms.txt
Use this file to discover all available pages before exploring further.
The IngestionPipeline orchestrates the complete flow of data from raw documents to indexed, searchable nodes. It handles parsing, transformation, embedding, and storage with built-in caching.
Overview
An ingestion pipeline:
Transforms documents through a series of steps
Caches intermediate results for efficiency
Handles embedding generation
Stores nodes in vector stores
Manages document updates and deduplication
Basic Usage
import {
Document ,
IngestionPipeline ,
SentenceSplitter
} from "llamaindex" ;
import { OpenAIEmbedding } from "@llamaindex/openai" ;
const document = new Document ({
text: "Your document text here..." ,
id_: "doc_1"
});
const pipeline = new IngestionPipeline ({
transformations: [
new SentenceSplitter ({ chunkSize: 1024 , chunkOverlap: 20 }),
new OpenAIEmbedding ()
]
});
const nodes = await pipeline . run ({
documents: [ document ]
});
console . log ( `Generated ${ nodes . length } nodes with embeddings` );
Pipeline Components
Transformations are the processing steps applied to your data:
import {
IngestionPipeline ,
SentenceSplitter ,
MarkdownNodeParser
} from "llamaindex" ;
import { OpenAIEmbedding } from "@llamaindex/openai" ;
const pipeline = new IngestionPipeline ({
transformations: [
// 1. Parse documents into nodes
new SentenceSplitter ({
chunkSize: 512 ,
chunkOverlap: 50
}),
// 2. Generate embeddings
new OpenAIEmbedding ({
model: "text-embedding-3-small"
})
]
});
Vector Store Integration
Automatically store nodes in a vector store:
import { IngestionPipeline } from "llamaindex" ;
import { PineconeVectorStore } from "@llamaindex/pinecone" ;
import { OpenAIEmbedding } from "@llamaindex/openai" ;
const vectorStore = new PineconeVectorStore ({
indexName: "my-index"
});
const pipeline = new IngestionPipeline ({
transformations: [
new SentenceSplitter ({ chunkSize: 1024 }),
new OpenAIEmbedding ()
],
vectorStore: vectorStore
});
// Nodes are automatically added to the vector store
await pipeline . run ({ documents });
Multiple Vector Stores
Use different vector stores for different modalities:
import { ModalityType } from "llamaindex" ;
const pipeline = new IngestionPipeline ({
transformations: [ ... ],
vectorStores: {
[ModalityType. TEXT ]: textVectorStore ,
[ModalityType. IMAGE ]: imageVectorStore
}
});
Caching
Pipelines cache transformation results to avoid reprocessing:
import { IngestionPipeline , IngestionCache } from "llamaindex" ;
const pipeline = new IngestionPipeline ({
transformations: [
new SentenceSplitter ({ chunkSize: 1024 }),
new OpenAIEmbedding ()
],
// Caching is enabled by default
disableCache: false
});
// First run: processes everything
const nodes1 = await pipeline . run ({ documents });
// Second run with same documents: uses cache
const nodes2 = await pipeline . run ({ documents });
// To disable caching:
const noCachePipeline = new IngestionPipeline ({
transformations: [ ... ],
disableCache: true
});
Custom Cache
const cache = new IngestionCache ( "my_custom_collection" );
const pipeline = new IngestionPipeline ({
transformations: [ ... ],
cache: cache
});
How Caching Works
A hash is computed from the input nodes and transformation configuration
If cached results exist for this hash, they’re returned immediately
Otherwise, the transformation runs and results are cached
Cache is stored in-memory by default
Document Store Strategies
Control how documents are managed and deduplicated:
import {
IngestionPipeline ,
DocStoreStrategy
} from "llamaindex" ;
import { SimpleDocumentStore } from "llamaindex/storage" ;
const docStore = new SimpleDocumentStore ();
const pipeline = new IngestionPipeline ({
transformations: [ ... ],
docStore: docStore ,
docStoreStrategy: DocStoreStrategy . UPSERTS
});
Available Strategies
DocStoreStrategy.UPSERTS (default): Update existing docs, insert new ones
DocStoreStrategy.DUPLICATES_ONLY: Skip duplicate documents
DocStoreStrategy.UPSERTS_AND_DELETE: Handle updates and deletions
DocStoreStrategy.NONE: No document store management
Pipelines accept multiple input sources:
Direct Documents
const nodes = await pipeline . run ({
documents: [ doc1 , doc2 , doc3 ]
});
Existing Nodes
const nodes = await pipeline . run ({
nodes: existingNodes
});
Reader Integration
import { SimpleDirectoryReader } from "@llamaindex/readers/directory" ;
const pipeline = new IngestionPipeline ({
transformations: [ ... ],
reader: new SimpleDirectoryReader ()
});
// Reader is automatically called
const nodes = await pipeline . run ();
Pipeline Documents
const pipeline = new IngestionPipeline ({
transformations: [ ... ],
documents: [ doc1 , doc2 ] // Set at pipeline creation
});
const nodes = await pipeline . run ();
Complete Example
import fs from "fs/promises" ;
import {
Document ,
IngestionPipeline ,
SentenceSplitter ,
VectorStoreIndex ,
DocStoreStrategy
} from "llamaindex" ;
import { OpenAIEmbedding } from "@llamaindex/openai" ;
import { PineconeVectorStore } from "@llamaindex/pinecone" ;
import { SimpleDocumentStore } from "llamaindex/storage" ;
async function main () {
// Load documents
const essay = await fs . readFile ( "essay.txt" , "utf-8" );
const document = new Document ({
text: essay ,
id_: "essay.txt" ,
metadata: {
source: "essay.txt" ,
author: "Paul Graham"
}
});
// Set up storage
const vectorStore = new PineconeVectorStore ({
indexName: "my-index"
});
const docStore = new SimpleDocumentStore ();
// Create pipeline
const pipeline = new IngestionPipeline ({
transformations: [
// Split into chunks
new SentenceSplitter ({
chunkSize: 1024 ,
chunkOverlap: 200
}),
// Generate embeddings
new OpenAIEmbedding ({
model: "text-embedding-3-small"
})
],
vectorStore: vectorStore ,
docStore: docStore ,
docStoreStrategy: DocStoreStrategy . UPSERTS
});
console . time ( "Pipeline Run Time" );
// Run pipeline
const nodes = await pipeline . run ({
documents: [ document ]
});
console . timeEnd ( "Pipeline Run Time" );
console . log ( `Processed ${ nodes . length } nodes` );
// Create index from vector store
const index = await VectorStoreIndex . fromVectorStore ( vectorStore );
// Query
const queryEngine = index . asQueryEngine ();
const response = await queryEngine . query ({
query: "What is the main topic?"
});
console . log ( response . toString ());
}
main (). catch ( console . error );
Advanced Usage
Create custom transformation components:
import { TransformComponent , BaseNode } from "llamaindex" ;
const customTransform = new TransformComponent (
async ( nodes : BaseNode []) => {
// Your custom transformation logic
return nodes . map ( node => {
// Modify node
node . metadata . processed = true ;
return node ;
});
}
);
const pipeline = new IngestionPipeline ({
transformations: [
new SentenceSplitter ({ chunkSize: 1024 }),
customTransform ,
new OpenAIEmbedding ()
]
});
import { runTransformations } from "llamaindex" ;
const transformations = [
new SentenceSplitter ({ chunkSize: 1024 }),
new OpenAIEmbedding ()
];
const nodes = await runTransformations (
documents ,
transformations ,
{}, // transform options
{
inPlace: false , // Don't modify original array
cache: myCache ,
docStoreStrategy: myStrategy
}
);
Batch Processing
import { SimpleDirectoryReader } from "@llamaindex/readers/directory" ;
// Process directory of files
const reader = new SimpleDirectoryReader ();
const documents = await reader . loadData ( "./documents" );
console . log ( `Loaded ${ documents . length } documents` );
const pipeline = new IngestionPipeline ({
transformations: [
new SentenceSplitter ({ chunkSize: 1024 }),
new OpenAIEmbedding ()
],
vectorStore: vectorStore
});
// Process all documents
const nodes = await pipeline . run ({ documents });
console . log ( `Created ${ nodes . length } total nodes` );
Enable caching for repeated runs with same documents
Use appropriate chunk sizes to balance quality and quantity
Batch documents when processing multiple files
Monitor token usage with embedding models
Use document stores to avoid reprocessing unchanged documents
Error Handling
try {
const nodes = await pipeline . run ({ documents });
} catch ( error ) {
if ( error . message . includes ( "Metadata length" )) {
console . error ( "Metadata too large for chunk size" );
// Reduce metadata or increase chunk size
} else if ( error . message . includes ( "API" )) {
console . error ( "Embedding API error:" , error );
// Handle API errors
} else {
throw error ;
}
}
Best Practices
Choose appropriate transformations
Text splitting for long documents
Embeddings for semantic search
Custom extractors for metadata
Configure caching wisely
Enable for development and repeated runs
Disable for production one-time ingestion
Use document stores
Track which documents have been processed
Avoid reprocessing unchanged content
Enable incremental updates
Monitor pipeline performance
Log processing times
Track node counts
Watch for errors in transformations
Handle large datasets
Process in batches
Use streaming when possible
Monitor memory usage
Next Steps
Node Parsers Configure text splitting strategies
Embeddings Choose and configure embedding models
Vector Stores Set up vector storage backends
Storage Manage document and index stores