Buildling Workflows
OnPrem Workflow Engine
Create orchestrated document-processing pipelines using simple YAML configuration files. Instead of writing code, you visually design workflows by connecting different types of nodes.
Table of Contents
Quick Start - Three Core Examples
The workflow engine provides three essential patterns that cover the most common document processing scenarios:
1. π Ingest PDFs to Vector Store (1_ingest_pdfs.yaml
)
Purpose: Load PDF files, chunk them, and store in a vector database for later retrieval.
# Run from the workflows directory
python -m onprem.workflow yaml_examples/1_ingest_pdfs.yaml
What it does: - Loads PDF files from ../sample_data/
- Converts PDFs to markdown for better processing - Chunks documents into 800-character pieces with 80-character overlap - Stores in ChromaDB vector database at document_vectors/
Requirements: PDF files in the sample_data directory
2. π Analyze from Vector Store (2_analyze_from_vectorstore.yaml
)
Purpose: Query an existing vector database, apply AI analysis, and export results.
# Requires: Run example 1 first + set OPENAI_API_KEY
python -m onprem.workflow yaml_examples/2_analyze_from_vectorstore.yaml
What it does: - Searches the vector database created in example 1 - Applies AI analysis to find documents about βartificial intelligence machine learningβ - Uses GPT-3.5-turbo to analyze each document for topic, key points, and relevance - Exports results to document_analysis_results.xlsx
Requirements: - Run example 1 first to create document_vectors/
- Set OPENAI_API_KEY
environment variable
3. π Direct Document Analysis (3_direct_analysis.yaml
)
Purpose: Analyze documents directly without a vector database using two-stage AI processing.
# Requires: set OPENAI_API_KEY
python -m onprem.workflow yaml_examples/3_direct_analysis.yaml
What it does: - Loads documents directly from ../sample_data/
- Processes complete documents (combines multi-page PDFs) - Applies AI analysis to extract document type, topic, entities, summary, and recommendations - Uses ResponseCleaner for consistent formatting - Exports results to document_analysis_summary.csv
Requirements: - Documents in sample_data directory - Set OPENAI_API_KEY
environment variable
Command-Line Usage
Basic Execution
python -m onprem.workflow <workflow.yaml>
Available Options
# Show help and examples
python -m onprem.workflow --help
# Validate workflow without running
python -m onprem.workflow --validate workflow.yaml
# List all available node types
python -m onprem.workflow --list-nodes
# Run quietly (suppress progress output)
python -m onprem.workflow --quiet workflow.yaml
# Show version
python -m onprem.workflow --version
Example Commands
# Run the PDF ingestion workflow
python -m onprem.workflow yaml_examples/1_ingest_pdfs.yaml
# Validate a workflow before running
python -m onprem.workflow --validate yaml_examples/2_analyze_from_vectorstore.yaml
# See all available node types
python -m onprem.workflow --list-nodes
Workflow Patterns
The three core examples demonstrate the main workflow patterns:
Pattern 1: Data Ingestion (Example 1)
Documents β Chunking β Vector Store
Use this pattern to build searchable databases from your document collections.
Pattern 2: Retrieval + Analysis (Example 2)
Vector Store β Query β AI Analysis β Export
Use this pattern to analyze specific topics from large document collections using semantic search.
Pattern 3: Direct Processing (Example 3)
Documents β Full Processing β AI Analysis β Cleanup β Export
Use this pattern for comprehensive analysis of entire document collections without intermediate storage.
Available Node Types Summary
π Loaders
- LoadFromFolder - Load all documents from a directory
- LoadSingleDocument - Load a specific file
- LoadWebDocument - Download and load from URL
βοΈ TextSplitters
- SplitByCharacterCount - Chunk by character count
- SplitByParagraph - Chunk by paragraphs (preserves structure)
- KeepFullDocument - Keep documents whole, optionally concatenate pages
π§ DocumentTransformers
- AddMetadata - Add custom metadata fields to documents
- ContentPrefix - Prepend text to document content
- ContentSuffix - Append text to document content
- DocumentFilter - Filter documents by metadata or content criteria
- PythonDocumentTransformer - Custom Python transformations
ποΈ Storage
- ChromaStore - Vector database for semantic search
- WhooshStore - Full-text search index
- ElasticsearchStore - Hybrid search capabilities
π Query
- QueryChromaStore - Search vector database
- QueryWhooshStore - Search text index
π€ Processors
- PromptProcessor - Apply AI analysis using custom prompts (DocumentProcessor)
- ResponseCleaner - Clean and format AI responses (ResultProcessor)
- SummaryProcessor - Generate document summaries (DocumentProcessor)
- AggregatorNode - Aggregate multiple results into single result using LLM (AggregatorProcessor)
- PythonAggregatorNode - Aggregate multiple results using custom Python code (AggregatorProcessor)
πΎ Exporters
- CSVExporter - Export to CSV format
- ExcelExporter - Export to Excel format
- JSONExporter - Export to JSON format
Workflow Structure
Basic Workflow YAML Format
Create a file called my_workflow.yaml
:
nodes:
document_loader:
type: LoadFromFolder
config:
source_directory: "/path/to/your/documents"
verbose: true
text_chunker:
type: SplitByCharacterCount
config:
chunk_size: 500
chunk_overlap: 50
search_index:
type: WhooshStore
config:
persist_location: "my_search_index"
connections:
- from: document_loader
from_port: documents
to: text_chunker
to_port: documents
- from: text_chunker
from_port: documents
to: search_index
to_port: documents
2. Run the Workflow
from onprem.workflow import execute_workflow
# Execute the workflow
= execute_workflow("my_workflow.yaml", verbose=True) results
Or programmatically:
from onprem.workflow import WorkflowEngine
= WorkflowEngine()
engine "my_workflow.yaml")
engine.load_workflow_from_yaml(= engine.execute(verbose=True) results
Workflow Structure
A workflow YAML file has two main sections:
Nodes Section
Defines the processing nodes in your pipeline:
nodes:
node_id: # Unique identifier for this node
type: NodeTypeName # Type of node (see Node Types Reference)
config: # Configuration specific to this node type
parameter1: value1
parameter2: value2
Connections Section
Defines how data flows between nodes:
connections:
- from: source_node_id # Source node ID
from_port: output_port # Output port name
to: target_node_id # Target node ID
to_port: input_port # Input port name
Port Types and Data Flow
The workflow system uses a strongly-typed port system to ensure data consistency and prevent invalid connections. Understanding port types is essential for building valid workflows.
Port Type Overview
There are three main port types in the workflow system:
π List[Document]
- Document Collections
Most common type - Contains LangChain Document objects with content and metadata.
# Document structure
Document(="The actual text content of the document...",
page_content={
metadata"source": "/path/to/file.pdf",
"page": 1,
"author": "John Smith",
"extension": "pdf"
} )
Used by: - All Loader β TextSplitter connections - All TextSplitter β Storage connections
- All Query β Processor connections
π List[Dict]
- Analysis Results
Processing output - Contains structured analysis results, summaries, or prompt responses.
# Results structure
[
{"document_id": 0,
"source": "research.pdf",
"prompt": "Analyze this document and provide...",
"response": "TOPIC: AI Research | TECH: Neural networks | LEVEL: Advanced",
"original_length": 1247,
"metadata": {"page": 1, "author": "Smith", "year": 2023}
} ]
Used by: - All Processor β Exporter connections
π Dict
- Single Aggregated Result
Aggregated analysis - Contains a single dictionary with consolidated results from multiple inputs.
# Aggregated result structure
{"aggregated_response": "Top 3 topics: AI (80%), automation (65%), data science (45%)",
"source_count": 12,
"aggregation_method": "llm_prompt",
"topic_analysis": {
"top_topics": ["AI", "automation", "data science"],
"coverage_percentage": 75.5
},"original_results": [...] # Reference to source data
}
Used by: - AggregatorNode and PythonAggregatorNode outputs - Can be exported using Exporter nodes (converted to single-row format)
β
str
- Status Messages
Completion status - Simple text messages indicating operation results.
# Status examples
"Successfully stored 150 documents in WhooshStore"
"Exported 25 results to analysis_report.xlsx"
"No documents to store"
Used by: - Storage node outputs (terminal) - Exporter node outputs (terminal)
Data Flow Patterns
Raw Files β List[Document] β List[Document] β str
β β β β
Loader TextSplitter Storage Status
Alternative analysis path:
Index β List[Document] β List[Dict] β str
β β β β
Query Processor Exporter Status
New aggregation path:
Index β List[Document] β List[Dict] β Dict β str
β β β β β
Query Processor Aggregator Export Status
Connection Validation
The workflow engine validates that connected ports have matching types:
β Valid Connections:
# Document processing chain
- from: loader
from_port: documents # List[Document]
to: chunker
to_port: documents # List[Document] β
# Analysis chain
- from: query
from_port: documents # List[Document]
to: processor
to_port: documents # List[Document] β
- from: processor
from_port: results # List[Dict]
to: exporter
to_port: results # List[Dict] β
β Invalid Connections:
# Type mismatch
- from: loader
from_port: documents # List[Document]
to: exporter
to_port: results # List[Dict] β
# Wrong direction
- from: storage
from_port: status # str (terminal node)
to: processor
to_port: documents # List[Document] β
Port Naming Conventions
documents
- Always containsList[Document]
objectsresults
- Always containsList[Dict]
with analysis results
result
- Always containsDict
with single aggregated result (AggregatorProcessor output)status
- Always containsstr
with completion messages
Metadata Preservation
Data flows preserve metadata throughout the pipeline:
- Loader β Document metadata (source, extension, dates, etc.)
- TextSplitter β Preserves original metadata in chunks
- Query β Returns documents with original metadata
- Processor β Includes metadata in results under
metadata
key - Exporter β Flattens metadata into columns (
meta_source
,meta_page
, etc.)
Error Messages
When port types donβt match, youβll see validation errors like:
WorkflowValidationError: Type mismatch:
loader.documents (List[Document]) -> exporter.results (List[Dict])
WorkflowValidationError: Target node processor has no input port 'status'.
Available: ['documents']
Understanding these port types helps you: - Design valid workflows - Debug connection errors - Understand data transformations - Plan processing pipelines
Node Types Reference
Loader Nodes
Loader nodes read documents from various sources and output List[Document]
.
LoadFromFolder
Loads all documents from a directory using ingest.load_documents
.
nodes:
my_loader:
type: LoadFromFolder
config:
source_directory: "/path/to/documents" # Required: Directory path
ignored_files: ["temp.txt", "draft.doc"] # Optional: Specific files to skip
include_patterns: ["*.pdf", "*.docx"] # Optional: Only load files matching these patterns
exclude_patterns: ["*draft*", "*temp*"] # Optional: Skip files matching these patterns
verbose: true # Optional: Show progress
pdf_markdown: false # Optional: Convert PDFs to markdown
pdf_unstructured: false # Optional: Use unstructured PDF parsing
n_proc: null # Optional: Number of CPU cores (null = all)
store_md5: false # Optional: Store MD5 hash in metadata
store_mimetype: false # Optional: Store MIME type in metadata
store_file_dates: false # Optional: Store file dates in metadata
infer_table_structure: false # Optional: Extract tables from PDFs
caption_tables: false # Optional: Generate table captions (requires llm)
extract_document_titles: false # Optional: Extract document titles (requires llm)
Filename Pattern Filtering:
include_patterns
: Only process files matching these glob patterns (e.g.,["*.pdf", "*.doc*"]
)exclude_patterns
: Skip files matching these glob patterns (e.g.,["*draft*", "*backup*"]
)- If both specified, file must match include pattern AND not match exclude pattern
- Uses standard Unix glob patterns:
*
(any chars),?
(single char),[abc]
(character set)
Output Ports: - documents
: List[Document]
- Loaded documents
LoadSingleDocument
Loads a single document using ingest.load_single_document
.
nodes:
single_doc:
type: LoadSingleDocument
config:
file_path: "/path/to/document.pdf" # Required: Path to single file
pdf_markdown: false # Optional: Convert PDF to markdown
pdf_unstructured: false # Optional: Use unstructured parsing
store_md5: false # Optional: Store MD5 hash
store_mimetype: false # Optional: Store MIME type
store_file_dates: false # Optional: Store file dates
infer_table_structure: false # Optional: Extract tables
Output Ports: - documents
: List[Document]
- Loaded document
LoadWebDocument
Downloads and loads a document from a URL.
nodes:
web_doc:
type: LoadWebDocument
config:
url: "https://example.com/document.pdf" # Required: Document URL
username: "user" # Optional: Authentication username
password: "pass" # Optional: Authentication password
Output Ports: - documents
: List[Document]
- Downloaded document
TextSplitter Nodes
TextSplitter nodes process documents and output chunked List[Document]
.
SplitByCharacterCount
Chunks documents by character count using ingest.chunk_documents
.
nodes:
char_splitter:
type: SplitByCharacterCount
config:
chunk_size: 500 # Optional: Characters per chunk (default: 500)
chunk_overlap: 50 # Optional: Overlap between chunks (default: 50)
infer_table_structure: false # Optional: Handle tables specially
preserve_paragraphs: false # Optional: Keep paragraphs intact
Input Ports: - documents
: List[Document]
- Documents to chunk
Output Ports: - documents
: List[Document]
- Chunked documents
SplitByParagraph
Chunks documents by paragraph boundaries, preserving document structure.
nodes:
para_splitter:
type: SplitByParagraph
config:
chunk_size: 1000 # Optional: Max characters per chunk
chunk_overlap: 100 # Optional: Overlap between chunks
# preserve_paragraphs is automatically set to True
Input Ports: - documents
: List[Document]
- Documents to chunk
Output Ports: - documents
: List[Document]
- Paragraph-based chunks
KeepFullDocument
Passes documents through without any chunking. Optionally concatenates multi-page documents and/or truncates documents to a maximum word count.
nodes:
no_split:
type: KeepFullDocument
config: {} # No configuration needed - keeps documents as-is
# For multi-page documents (PDFs, etc.) - combine into single document
full_document:
type: KeepFullDocument
config:
concatenate_pages: true # Optional: Combine pages into single document
# Truncate documents to first N words (useful for LLM context limits)
truncated_document:
type: KeepFullDocument
config:
max_words: 500 # Optional: Truncate to first 500 words
# Both concatenation and truncation (applied in that order)
combined_processing:
type: KeepFullDocument
config:
concatenate_pages: true # First: combine multi-page documents
max_words: 1000 # Then: truncate to first 1000 words
Page Concatenation:
When concatenate_pages: true
, multi-page documents are combined: - Pages sorted by page number - Content joined with --- PAGE BREAK ---
separators - Metadata preserved from first page plus additional fields: - page: -1
(indicates full document) - page_count: N
(number of pages combined) - page_range: "1-5"
(original page range) - concatenated: true
(flag indicating concatenation)
Document Truncation:
When max_words: N
is specified, documents are truncated to the first N words: - Word boundaries are preserved (no partial words) - Metadata is enriched with truncation information: - original_word_count: 2500
(original document length) - truncated: true
(indicates truncation occurred) - truncated_word_count: 500
(target truncation size) - Documents shorter than max_words
are passed through unchanged - Processing order: concatenation first, then truncation
Use Cases: - Page Concatenation: - Resume Processing - Combine multi-page resumes into single document - Contract Analysis - Process entire contracts as one unit - Report Analysis - Analyze complete reports without page boundaries - Legal Documents - Preserve document structure while enabling full-text analysis
- Document Truncation:
- LLM Context Management - Fit long documents within token limits
- Cost Control - Reduce processing costs for very long documents
- Preview Generation - Create document summaries from beginnings
- Performance Optimization - Speed up processing of large documents
- Classification Tasks - Use document openings for categorization
Input Ports: - documents
: List[Document]
- Documents to pass through or concatenate
Output Ports: - documents
: List[Document]
- Unchanged or concatenated documents
DocumentTransformer Nodes
DocumentTransformer nodes transform documents while preserving the List[Document]
β List[Document]
flow. They can add metadata, modify content, filter documents, or apply custom transformations. These nodes can be placed anywhere in the document pipeline.
AddMetadata
Adds static metadata fields to all documents for categorization and organization.
nodes:
categorize_meeting:
type: AddMetadata
config:
metadata:
category: "meeting20251001"
department: "engineering"
priority: "high"
project: "Project Alpha"
classification: "internal"
Use Cases: - Meeting Organization - Tag all documents from a specific meeting - Project Tracking - Add project identifiers to document collections - Department Categorization - Organize documents by department or team - Classification - Mark documents as confidential, internal, or public - Batch Processing - Add consistent metadata to large document collections
Input Ports: - documents
: List[Document]
- Documents to enrich
Output Ports: - documents
: List[Document]
- Documents with added metadata
ContentPrefix
Prepends text to the page_content of all documents.
nodes:
mark_confidential:
type: ContentPrefix
config:
prefix: "[CONFIDENTIAL - INTERNAL USE ONLY]"
separator: "\n\n" # Optional: separator between prefix and content (default: "\n\n")
add_header:
type: ContentPrefix
config:
prefix: "Project Alpha Documentation"
separator: "\n---\n"
Use Cases: - Confidentiality Markings - Add confidential headers to sensitive documents - Document Headers - Add consistent headers to document collections - Processing Stamps - Mark documents as processed by specific workflows - Context Addition - Add contextual information to document beginnings
Input Ports: - documents
: List[Document]
- Documents to modify
Output Ports: - documents
: List[Document]
- Documents with prefixed content
ContentSuffix
Appends text to the page_content of all documents.
nodes:
add_footer:
type: ContentSuffix
config:
suffix: |
---
Document processed by OnPrem Workflow Engine
Processing date: 2025-01-16
For questions, contact: admin@company.com separator: "\n" # Optional: separator between content and suffix (default: "\n\n")
Use Cases: - Processing Information - Add processing timestamps and contact info - Legal Disclaimers - Append legal text to documents - Document Footers - Add consistent footers to document collections - Attribution - Add source or processing attribution
Input Ports: - documents
: List[Document]
- Documents to modify
Output Ports: - documents
: List[Document]
- Documents with appended content
DocumentFilter
Filters documents based on metadata criteria, content patterns, or length requirements.
nodes:
filter_engineering:
type: DocumentFilter
config:
# Filter by metadata
metadata_filters:
department: "engineering"
status: "active"
# Filter by content
content_contains: ["project", "analysis", "results"]
content_excludes: ["draft", "template"]
# Filter by length
min_length: 100
max_length: 10000
# Simple content filtering
relevant_docs_only:
type: DocumentFilter
config:
content_contains: ["machine learning", "AI", "neural network"]
min_length: 50
Filter Options: - metadata_filters
: Dictionary of metadata key-value pairs that must match exactly - content_contains
: List of terms - document must contain at least one - content_excludes
: List of terms - document must not contain any - min_length
: Minimum content length in characters - max_length
: Maximum content length in characters
Use Cases: - Relevance Filtering - Keep only documents containing specific keywords - Quality Control - Remove documents that are too short or too long - Content Curation - Filter out drafts, templates, or irrelevant content - Metadata-based Selection - Keep only documents matching specific criteria
Input Ports: - documents
: List[Document]
- Documents to filter
Output Ports: - documents
: List[Document]
- Filtered documents
PythonDocumentTransformer
Executes custom Python code to transform documents with full flexibility and security controls.
nodes:
extract_document_info:
type: PythonDocumentTransformer
config:
code: |
# Available variables:
# - doc: Document object
# - content: doc.page_content (string)
# - metadata: doc.metadata (mutable copy)
# - document_id: index of document (int)
# - source: source file path (string)
# Extract information from content
import re
word_count = len(content.split())
sentence_count = len(re.findall(r'[.!?]+', content))
# Find email addresses
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, content)
# Determine document type
if 'meeting' in content.lower() and 'agenda' in content.lower():
doc_type = 'meeting_agenda'
elif 'analysis' in content.lower():
doc_type = 'analysis_report'
else:
doc_type = 'general_document'
# Enrich metadata
metadata.update({
'word_count': word_count,
'sentence_count': sentence_count,
'email_count': len(emails),
'document_type': doc_type,
'complexity_score': min(10, word_count // 100)
})
# Add summary to content
summary = f"[{doc_type.upper()}: {word_count} words, Complexity: {metadata['complexity_score']}/10]"
content = summary + "\n\n" + content
# Create transformed document
transformed_doc = Document(
page_content=content,
metadata=metadata
)
# Load transformation from external file
complex_transform:
type: PythonDocumentTransformer
config:
code_file: "scripts/document_enricher.py"
Available Python Environment: - Built-in functions: len
, str
, int
, float
, bool
, list
, dict
, min
, max
, etc. - Safe modules: re
, json
, math
, datetime
(pre-imported) - Document class: Available for creating new Document objects - Security: No file I/O, network access, or system operations
Variable Reference: - doc
: Original Document object (read-only) - content
: Document content (modifiable string) - metadata
: Document metadata (modifiable dictionary copy) - document_id
: Index of current document (int) - source
: Source file path (string) - transformed_doc
: Set this to a Document object for the output (optional)
Transformation Options: 1. Modify Variables: Change content
and metadata
, let the system create the Document 2. Explicit Creation: Create and set transformed_doc
explicitly
Use Cases: - Content Analysis - Extract key information and add to metadata - Document Classification - Automatically categorize documents by content - Data Extraction - Find emails, URLs, phone numbers, etc. - Content Transformation - Modify content based on complex rules - Custom Enrichment - Add calculated metrics or derived information
Input Ports: - documents
: List[Document]
- Documents to transform
Output Ports: - documents
: List[Document]
- Transformed documents
Storage Nodes
Storage nodes save documents to various backends and return status messages.
ChromaStore
Stores documents in a ChromaDB vector database.
nodes:
vector_db:
type: ChromaStore
config:
persist_location: "/path/to/chromadb" # Optional: Database path
# Additional ChromaDB configuration options
Input Ports: - documents
: List[Document]
- Documents to store
Output Ports: - status
: str
- Storage status message
WhooshStore
Stores documents in a Whoosh full-text search index.
nodes:
search_index:
type: WhooshStore
config:
persist_location: "/path/to/whoosh_index" # Optional: Index path
# Additional Whoosh configuration options
Input Ports: - documents
: List[Document]
- Documents to index
Output Ports: - status
: str
- Indexing status message
ElasticsearchStore
Stores documents in an Elasticsearch cluster.
nodes:
es_store:
type: ElasticsearchStore
config:
persist_location: "http://localhost:9200" # Required: Elasticsearch URL
index_name: "my_documents" # Optional: Index name
# Additional Elasticsearch configuration options
Input Ports: - documents
: List[Document]
- Documents to store
Output Ports: - status
: str
- Storage status message
Query Nodes
Query nodes search existing storage indexes and return matching documents.
QueryWhooshStore
Searches documents in a Whoosh full-text search index with support for different search types.
nodes:
# Sparse search (pure keyword matching)
keyword_search:
type: QueryWhooshStore
config:
persist_location: "/path/to/whoosh_index" # Required: Index path
query: "artificial intelligence ML" # Required: Search terms
search_type: "sparse" # Optional: "sparse" or "semantic" (default: sparse)
limit: 20 # Optional: Max results (default: 100)
# Semantic search (keyword + embedding re-ranking)
smart_search:
type: QueryWhooshStore
config:
persist_location: "/path/to/whoosh_index"
query: "machine learning concepts"
search_type: "semantic" # Uses keyword search + semantic re-ranking
limit: 10
Search Types: - sparse
: Pure keyword/full-text search using Whoosh - semantic
: Keyword search followed by embedding-based re-ranking
Input Ports: - None (queries existing storage directly)
Output Ports: - documents
: List[Document]
- Matching documents
QueryChromaStore
Searches documents in a ChromaDB vector database using semantic similarity.
nodes:
vector_search:
type: QueryChromaStore
config:
persist_location: "/path/to/chromadb" # Required: Database path
query: "machine learning algorithms" # Required: Search query
search_type: "semantic" # Optional: Only "semantic" supported (default)
limit: 10 # Optional: Max results (default: 10)
Search Types: - semantic
: Vector similarity search (only supported type)
Input Ports: - None (queries existing storage directly)
Output Ports: - documents
: List[Document]
- Similar documents
QueryElasticsearchStore
Searches documents in an Elasticsearch index with full support for all search types.
nodes:
# Sparse search (BM25 text matching)
text_search:
type: QueryElasticsearchStore
config:
persist_location: "http://localhost:9200" # Required: Elasticsearch URL
index_name: "my_index" # Required: Index name
query: "artificial intelligence" # Required: Search query
search_type: "sparse" # Optional: "sparse", "semantic", or "hybrid"
limit: 5 # Optional: Max results (default: 10)
# Semantic search (vector similarity)
vector_search:
type: QueryElasticsearchStore
config:
persist_location: "https://my-es:9200"
index_name: "documents"
query: "machine learning concepts"
search_type: "semantic" # Dense vector search
limit: 3
basic_auth: ["user", "password"] # Optional: Authentication
verify_certs: false # Optional: SSL verification
# Hybrid search (combines text + vector)
best_search:
type: QueryElasticsearchStore
config:
persist_location: "http://localhost:9200"
index_name: "knowledge_base"
query: "deep learning neural networks"
search_type: "hybrid" # Best of both worlds
weights: [0.7, 0.3] # Optional: [text_weight, vector_weight]
limit: 5
Search Types: - sparse
: Traditional BM25 text search - semantic
: Dense vector similarity search
- hybrid
: Weighted combination of sparse + semantic results
Input Ports: - None (queries existing storage directly)
Output Ports: - documents
: List[Document]
- Matching documents
Processor Nodes
Processor nodes apply AI analysis, prompts, or transformations to documents.
PromptProcessor
Applies a custom prompt to each document using an LLM.
nodes:
document_analyzer:
type: PromptProcessor
config:
prompt: | # Option 1: Inline prompt template
Analyze this document and provide:
1. Main topic:
2. Key findings:
3. Complexity (1-5):
Source: {source}
Content: {content} llm: # New flexible LLM configuration
model_url: "openai://gpt-3.5-turbo" # Model URL specification
temperature: 0.7 # Creativity level
max_tokens: 1000 # Response length limit
batch_size: 5 # Optional: Process in batches
# Alternative: Load complex prompt from file with advanced LLM config
complex_analyzer:
type: PromptProcessor
config:
prompt_file: "prompts/statute_extraction.txt" # Option 2: Load from file
llm: # Advanced LLM configuration
model_url: "openai://gpt-4o-mini" # Full model URL specification
temperature: 0 # Deterministic results
mute_stream: true # Quiet processing
timeout: 60 # Request timeout
batch_size: 2 # Optional: Process in batches
Loading Prompts from Files:
For complex prompts, you can store them in separate text files and reference them with prompt_file
:
# File: prompts/resume_parser.txt
Analyze the resume and extract details in JSON format:
{
"name": "...",
"skills": ["...", "..."],
"experience": [...]
}
Resume text: {content}
# Workflow configuration
config:
prompt_file: "prompts/resume_parser.txt"
Benefits of External Prompt Files: - Better organization for complex prompts - Version control and collaboration - Reusability across workflows - Easier prompt engineering and testing
LLM Configuration Options:
The llm
section accepts all parameters supported by the OnPrem LLM class.
LLM Instance Sharing:
The workflow engine automatically shares LLM instances between processors that use identical configurations, improving performance and memory usage:
nodes:
extractor:
type: PromptProcessor
config:
prompt_file: "prompts/extraction.txt"
llm:
model_url: "openai://gpt-4o-mini" # LLM instance created
temperature: 0
cleaner:
type: ResponseCleaner
config:
cleanup_prompt_file: "prompts/cleanup.txt"
llm:
model_url: "openai://gpt-4o-mini" # Same instance reused!
temperature: 0
Configuration Options:
llm:
# Model specification
model_url: "openai://gpt-4o-mini" # Model URL (recommended format)
# Generation parameters
temperature: 0.7 # Randomness (0.0-2.0)
max_tokens: 1500 # Maximum response length
top_p: 0.9 # Nucleus sampling
frequency_penalty: 0.0 # Repetition penalty
presence_penalty: 0.0 # Topic diversity penalty
# Behavior options
mute_stream: true # Suppress streaming output
timeout: 120 # Request timeout in seconds
# Provider-specific options (passed through)
api_key: "${OPENAI_API_KEY}" # API authentication
base_url: "https://api.openai.com/v1" # Custom API endpoint
Prompt Variables: - {content}
- Document content - {source}
- Document source path - {page}
- Page number (if available) - Any metadata field (e.g., {meta_author}
)
Input Ports: - documents
: List[Document]
- Documents to process
Output Ports: - results
: List[Dict]
- Analysis results with prompt responses
ResponseCleaner
Post-processes and cleans LLM responses using another LLM call.
nodes:
response_cleaner:
type: ResponseCleaner
config:
cleanup_prompt: | # Inline cleanup instructions
Remove XML tags and clean up formatting while preserving all valid content:
{original_response}
Keep all important information, just remove formatting artifacts. llm:
model_url: "openai://gpt-3.5-turbo"
temperature: 0 # Deterministic cleanup
# Alternative: Load cleanup prompt from file
citation_cleaner:
type: ResponseCleaner
config:
cleanup_prompt_file: "prompts/statute_cleanup.txt" # Complex cleanup rules
llm:
model_url: "openai://gpt-4o-mini"
temperature: 0
mute_stream: true
Use Cases: - Extract structured data from messy LLM responses - Remove formatting artifacts like XML tags or unwanted text
- Clean statutory citations while preserving all valid references (U.S.C., Public Laws, etc.) - Standardize outputs for consistent data processing - Chain with PromptProcessor for two-stage processing
Important Notes: - Cleanup prompts should be carefully designed to avoid over-aggressive cleaning - Always test with representative examples to ensure valid data isnβt removed - Consider the specific domain and format of your LLM responses
Input Ports: - results
: List[Dict]
- Results from PromptProcessor to clean
Output Ports: - results
: List[Dict]
- Cleaned results (original kept in original_response
field)
SummaryProcessor
Generates summaries for documents using an LLM.
nodes:
summarizer:
type: SummaryProcessor
config:
max_length: 150 # Optional: Max summary length in words
model_name: "gpt-3.5-turbo" # Optional: LLM model
llm_type: "openai" # Optional: LLM provider
Input Ports: - documents
: List[Document]
- Documents to summarize
Output Ports: - results
: List[Dict]
- Summaries with metadata
PythonDocumentProcessor
Executes custom Python code on documents with proper security controls, allowing unlimited customization of document processing logic.
nodes:
# Inline Python code
custom_analyzer:
type: PythonDocumentProcessor
config:
code: |
# Available variables:
# - doc: Document object
# - content: doc.page_content (string)
# - metadata: doc.metadata (dict)
# - document_id: index of document (int)
# - source: source file path (string)
# - result: dictionary to populate (dict)
# Extract key information (re module is pre-imported)
word_count = len(content.split())
sentence_count = len(re.findall(r'[.!?]+', content))
# Find email addresses
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, content)
# Populate result dictionary
result['analysis'] = {
'word_count': word_count,
'sentence_count': sentence_count,
'emails_found': emails,
'document_length': len(content)
}
result['processing_status'] = 'completed'
# Load Python code from external file
external_processor:
type: PythonDocumentProcessor
config:
code_file: "scripts/document_analyzer.py"
Available Python Environment: - Built-in functions: len
, str
, int
, float
, bool
, list
, dict
, min
, max
, etc. - Safe modules: re
, json
, math
, datetime
(pre-imported) - Document class: Available for creating new Document objects - Security: No file I/O, network access, or system operations
Input Ports: - documents
: List[Document]
- Documents to process
Output Ports: - results
: List[Dict]
- Processing results with custom analysis
PythonResultProcessor
Executes custom Python code on processing results, enabling post-processing and enhancement of analysis results.
nodes:
result_enhancer:
type: PythonResultProcessor
config:
code: |
# Available variables:
# - result: original result dictionary (modifiable copy)
# - original_result: read-only original result
# - result_id: index of result (int)
# - processed_result: dictionary to populate (dict)
# Enhance analysis results
analysis = result.get('analysis', {})
word_count = analysis.get('word_count', 0)
# Categorize document by length
if word_count < 100:
category = 'short'
elif word_count < 500:
category = 'medium'
else:
category = 'long'
# Create enhanced result
processed_result['enhanced_analysis'] = {
'original_analysis': analysis,
'document_category': category,
'complexity_score': min(10, word_count // 50),
'has_emails': len(analysis.get('emails_found', [])) > 0
}
# Add summary processed_result['summary'] = f"Document categorized as '{category}'"
Variable Naming Conventions: - Document Processor: Populate the result
dictionary with your analysis - Result Processor: Populate the processed_result
dictionary with enhanced data
Input Ports: - results
: List[Dict]
- Results to process
Output Ports: - results
: List[Dict]
- Enhanced processing results
AggregatorNode
Aggregates multiple processing results into a single collapsed result using LLM-based analysis. Perfect for creating summaries of summaries, extracting top themes from multiple responses, or producing executive summaries from document collections.
nodes:
# Topic aggregation - analyze topic keywords across documents
topic_aggregator:
type: AggregatorNode
config:
prompt: |
Analyze these {num_results} topic keywords from different documents and identify the top 5 most important topics:
{responses}
Please provide:
1. TOP TOPICS: List the 5 most important topics with frequency
2. TRENDS: What patterns do you see across documents?
3. SUMMARY: One sentence summary of the overall theme
Format your response clearly with headings. llm:
model_url: "openai://gpt-3.5-turbo"
temperature: 0.3
max_tokens: 500
# Summary aggregation - create summary of summaries
meta_summarizer:
type: AggregatorNode
config:
prompt_file: "prompts/summary_aggregation.txt" # Load from file
llm:
model_url: "openai://gpt-4o-mini"
temperature: 0.1
Use Cases: - Topic Analysis - Individual documents β topic keywords β top themes across collection - Executive Summaries - Document summaries β executive summary of key findings - Survey Analysis - Individual responses β overall trends and insights - Meeting Analysis - Multiple meeting notes β key decisions and action items - Research Synthesis - Paper summaries β research landscape overview
Input Ports: - results
: List[Dict]
- Multiple results to aggregate (from PromptProcessor, SummaryProcessor, etc.)
Output Ports:
- result
: Dict
- Single aggregated result containing: - aggregated_response
: LLMβs aggregated analysis - source_count
: Number of original results processed - aggregation_method
: βllm_promptβ - original_results
: Reference to source data
PythonAggregatorNode
Aggregates multiple processing results using custom Python code for precise control over aggregation logic. Ideal for statistical analysis, data consolidation, and algorithmic aggregation.
nodes:
# Topic frequency analysis
topic_frequency_analyzer:
type: PythonAggregatorNode
config:
code: |
# Available variables:
# - results: list of all result dictionaries
# - num_results: number of results
# - result: dictionary to populate with aggregated result
# Count topic frequency across all responses
topic_counts = {}
total_responses = 0
for res in results:
response = res.get('response', '')
if response:
topics = [t.strip() for t in response.split(',') if t.strip()]
total_responses += 1
for topic in topics:
topic_counts[topic] = topic_counts.get(topic, 0) + 1
# Get top topics by frequency
sorted_topics = sorted(topic_counts.items(), key=lambda x: x[1], reverse=True)
top_topics = sorted_topics[:5]
# Calculate statistics
result['topic_analysis'] = {
'top_topics': [
{'topic': topic, 'frequency': count, 'percentage': round(count/total_responses*100, 1)}
for topic, count in top_topics
],
'total_unique_topics': len(topic_counts),
'total_documents': total_responses,
'coverage': f"{len([t for t in topic_counts.values() if t > 1])} topics appear in multiple documents"
}
result['aggregation_summary'] = f"Analyzed {total_responses} documents, found {len(topic_counts)} unique topics"
# Statistical summary aggregator
statistical_aggregator:
type: PythonAggregatorNode
config:
code: |
# Aggregate numerical metrics from document analysis
import json
import math
word_counts = []
sentiment_scores = []
complexity_scores = []
for res in results:
# Extract metrics from results
metadata = res.get('metadata', {})
analysis = res.get('analysis', {})
if 'word_count' in metadata:
word_counts.append(metadata['word_count'])
if 'sentiment_score' in analysis:
sentiment_scores.append(analysis['sentiment_score'])
if 'complexity_score' in analysis:
complexity_scores.append(analysis['complexity_score'])
# Calculate statistics
def calc_stats(values):
if not values:
return {}
return {
'count': len(values),
'mean': sum(values) / len(values),
'median': sorted(values)[len(values)//2],
'min': min(values),
'max': max(values),
'std_dev': math.sqrt(sum((x - sum(values)/len(values))**2 for x in values) / len(values))
}
result['statistical_summary'] = {
'document_metrics': {
'total_documents': len(results),
'word_count_stats': calc_stats(word_counts),
'sentiment_stats': calc_stats(sentiment_scores),
'complexity_stats': calc_stats(complexity_scores)
},
'data_quality': {
'documents_with_word_count': len(word_counts),
'documents_with_sentiment': len(sentiment_scores),
'documents_with_complexity': len(complexity_scores)
}
}
# Create readable summary
avg_words = result['statistical_summary']['document_metrics']['word_count_stats'].get('mean', 0) result['executive_summary'] = f"Processed {len(results)} documents, average length: {avg_words:.0f} words"
Use Cases: - Topic Frequency Analysis - Count and rank topics across document collections - Statistical Aggregation - Calculate means, medians, distributions from analysis results - Data Consolidation - Merge and deduplicate information from multiple sources - Custom Scoring - Apply business logic to rank or categorize results - Format Conversion - Transform aggregated data into specific output formats
Available Python Environment: - Built-in functions: len
, str
, int
, float
, bool
, list
, dict
, min
, max
, sum
, etc. - Safe modules: re
, json
, math
, datetime
(pre-imported) - Security: No file I/O, network access, or system operations
Input Ports: - results
: List[Dict]
- Multiple results to aggregate
Output Ports: - result
: Dict
- Single aggregated result containing: - Custom fields based on your aggregation logic - source_count
: Number of original results processed - aggregation_method
: βpython_codeβ
Exporter Nodes
Exporter nodes save processed results to various file formats.
CSVExporter
Exports results to CSV format for spreadsheet analysis.
nodes:
csv_output:
type: CSVExporter
config:
output_path: "results.csv" # Optional: Output file (default: results.csv)
columns: ["source", "response"] # Optional: Columns to include (default: all)
Input Ports: - results
: List[Dict]
- Results to export
Output Ports: - status
: str
- Export status message
ExcelExporter
Exports results to Excel format with formatting support.
nodes:
excel_output:
type: ExcelExporter
config:
output_path: "analysis.xlsx" # Optional: Output file (default: results.xlsx)
sheet_name: "Document_Analysis" # Optional: Sheet name (default: Results)
Input Ports: - results
: List[Dict]
- Results to export
Output Ports: - status
: str
- Export status message
JSONExporter
Exports results to JSON format for programmatic access. Can handle both regular processing results and single aggregated results.
nodes:
json_output:
type: JSONExporter
config:
output_path: "results.json" # Optional: Output file (default: results.json)
pretty_print: true # Optional: Format JSON nicely (default: true)
Input Ports: - results
: List[Dict]
- Multiple results to export (from processors) - result
: Dict
- Single aggregated result to export (from aggregators)
Output Ports: - status
: str
- Export status message
Configuration Options
Common Configuration Patterns
PDF Processing Options
config:
pdf_markdown: true # Convert PDFs to markdown format
pdf_unstructured: false # Use unstructured parsing for complex PDFs
infer_table_structure: true # Extract and preserve table structure
Metadata Enhancement
config:
store_md5: true # Add MD5 hash to document metadata
store_mimetype: true # Add MIME type to document metadata
store_file_dates: true # Add creation/modification dates
extract_document_titles: true # Extract document titles (requires LLM)
Performance Tuning
config:
n_proc: 4 # Use 4 CPU cores for parallel processing
verbose: true # Show detailed progress information
batch_size: 1000 # Process documents in batches
Vector Store Configuration
Different storage backends support different configuration options:
# ChromaDB
config:
persist_location: "./chroma_db"
collection_name: "documents"
# Whoosh
config:
persist_location: "./whoosh_index"
schema_fields: ["content", "title", "source"]
# Elasticsearch
config:
persist_location: "https://elastic:password@localhost:9200"
index_name: "document_index"
basic_auth: ["username", "password"]
Advanced Examples
Multi-Source Pipeline
Process documents from multiple sources with different strategies:
nodes:
# Load PDFs with table extraction
pdf_loader:
type: LoadFromFolder
config:
source_directory: "pdfs/"
pdf_markdown: true
infer_table_structure: true
# Load text files
text_loader:
type: LoadFromFolder
config:
source_directory: "texts/"
# Chunk PDFs by paragraph (preserve structure)
pdf_chunker:
type: SplitByParagraph
config:
chunk_size: 1000
chunk_overlap: 100
# Chunk text files by character count
text_chunker:
type: SplitByCharacterCount
config:
chunk_size: 500
chunk_overlap: 50
# Store everything in unified index
unified_store:
type: WhooshStore
config:
persist_location: "unified_index"
connections:
- from: pdf_loader
from_port: documents
to: pdf_chunker
to_port: documents
- from: text_loader
from_port: documents
to: text_chunker
to_port: documents
- from: pdf_chunker
from_port: documents
to: unified_store
to_port: documents
- from: text_chunker
from_port: documents
to: unified_store
to_port: documents
Document Processing Chain
Multiple processing steps in sequence:
nodes:
loader:
type: LoadFromFolder
config:
source_directory: "documents/"
extract_document_titles: true
# First pass: large chunks for context
coarse_chunker:
type: SplitByParagraph
config:
chunk_size: 2000
chunk_overlap: 200
# Second pass: fine chunks for retrieval
fine_chunker:
type: SplitByCharacterCount
config:
chunk_size: 400
chunk_overlap: 40
# Store in vector database
vector_store:
type: ChromaStore
config:
persist_location: "vector_db"
connections:
- from: loader
from_port: documents
to: coarse_chunker
to_port: documents
- from: coarse_chunker
from_port: documents
to: fine_chunker
to_port: documents
- from: fine_chunker
from_port: documents
to: vector_store
to_port: documents
Web Document Processing
Download and process documents from URLs:
nodes:
web_loader:
type: LoadWebDocument
config:
url: "https://example.com/report.pdf"
username: "api_user"
password: "secret_key"
doc_processor:
type: SplitByCharacterCount
config:
chunk_size: 800
chunk_overlap: 80
search_store:
type: ElasticsearchStore
config:
persist_location: "http://elasticsearch:9200"
index_name: "web_documents"
connections:
- from: web_loader
from_port: documents
to: doc_processor
to_port: documents
- from: doc_processor
from_port: documents
to: search_store
to_port: documents
Document Organization and Enrichment
Process documents with comprehensive metadata enrichment and content transformation:
nodes:
# Load meeting documents
meeting_loader:
type: LoadFromFolder
config:
source_directory: "meeting_docs/"
include_patterns: ["*.pdf", "*.docx", "*.txt"]
# Tag all documents with meeting metadata
tag_meeting_info:
type: AddMetadata
config:
metadata:
meeting_id: "meeting20251001"
department: "engineering"
project: "Project Alpha"
classification: "internal"
attendees: "team_leads"
# Add confidential header to all documents
mark_confidential:
type: ContentPrefix
config:
prefix: "[CONFIDENTIAL - PROJECT ALPHA TEAM ONLY]"
separator: "\n\n"
# Extract key information and enrich metadata
analyze_content:
type: PythonDocumentTransformer
config:
code: |
# Analyze document content for key information
import re
# Basic statistics
word_count = len(content.split())
paragraph_count = len([p for p in content.split('\n\n') if p.strip()])
# Look for action items and decisions
action_items = len(re.findall(r'(?:action item|todo|task):', content, re.IGNORECASE))
decisions = len(re.findall(r'(?:decision|resolved|agreed):', content, re.IGNORECASE))
# Find mentions of team members
team_members = re.findall(r'@(\w+)', content)
# Classify document type
content_lower = content.lower()
if 'agenda' in content_lower:
doc_type = 'meeting_agenda'
elif action_items > 0 or 'action' in content_lower:
doc_type = 'action_items'
elif 'minutes' in content_lower or 'notes' in content_lower:
doc_type = 'meeting_minutes'
else:
doc_type = 'meeting_document'
# Update metadata with extracted information
metadata.update({
'word_count': word_count,
'paragraph_count': paragraph_count,
'action_items_count': action_items,
'decisions_count': decisions,
'mentioned_members': list(set(team_members)),
'document_type': doc_type,
'priority_score': min(10, (action_items * 2) + decisions),
'has_action_items': action_items > 0,
'complexity': 'high' if word_count > 1000 else 'medium' if word_count > 300 else 'low'
})
# Add document summary at the beginning
summary = f"[{doc_type.upper()}: {word_count} words, {action_items} action items, Priority: {metadata['priority_score']}/10]"
content = summary + "\n\n" + content
# Create enriched document
transformed_doc = Document(
page_content=content,
metadata=metadata
)
# Filter to keep only relevant documents
filter_important:
type: DocumentFilter
config:
metadata_filters:
classification: "internal"
# Keep documents with action items or decisions
content_contains: ["action", "decision", "task", "todo"]
min_length: 100
# Add processing footer
add_footer:
type: ContentSuffix
config:
suffix: |
---
Document processed: 2025-01-16
Meeting ID: meeting20251001
Next review: 2025-01-23
Contact: project-alpha-admin@company.com
# Chunk for storage
chunk_docs:
type: SplitByParagraph
config:
chunk_size: 1000
chunk_overlap: 100
# Store enriched documents
meeting_store:
type: WhooshStore
config:
persist_location: "meeting_20251001_index"
connections:
- from: meeting_loader
from_port: documents
to: tag_meeting_info
to_port: documents
- from: tag_meeting_info
from_port: documents
to: mark_confidential
to_port: documents
- from: mark_confidential
from_port: documents
to: analyze_content
to_port: documents
- from: analyze_content
from_port: documents
to: filter_important
to_port: documents
- from: filter_important
from_port: documents
to: add_footer
to_port: documents
- from: add_footer
from_port: documents
to: chunk_docs
to_port: documents
- from: chunk_docs
from_port: documents
to: meeting_store
to_port: documents
This example demonstrates the power of DocumentTransformer nodes:
- Metadata Tagging - Organizes documents by meeting, project, and department
- Content Marking - Adds confidential headers for security
- Intelligent Analysis - Extracts action items, decisions, and team mentions
- Quality Filtering - Keeps only documents with actionable content
- Processing Attribution - Adds footer with processing information
- Searchable Storage - Creates indexed, searchable document collection
The enriched metadata enables powerful queries like: - βFind all documents from meeting20251001 with action itemsβ - βShow high-priority engineering documents from Project Alphaβ - βList all documents mentioning specific team membersβ
Validation and Error Handling
The workflow engine performs comprehensive validation:
Connection Validation
- Port Existence: Verifies source and target ports exist
- Type Compatibility: Ensures data types match between connections
- Node Compatibility: Enforces valid connection patterns:
- β Loader β TextSplitter
- β Loader β DocumentTransformer
- β
TextSplitter β TextSplitter
- β TextSplitter β DocumentTransformer
- β TextSplitter β Storage
- β DocumentTransformer β TextSplitter
- β
DocumentTransformer β DocumentTransformer
- β DocumentTransformer β Storage
- β Query β DocumentTransformer
- β Loader β Storage (must have TextSplitter or DocumentTransformer in between)
- β Storage β Any (Storage nodes are terminal)
Runtime Validation
- File Existence: Checks that source directories and files exist
- Configuration Validation: Validates required parameters
- Dependency Resolution: Uses topological sorting to determine execution order
- Cycle Detection: Prevents infinite loops in workflows
Error Messages
The engine provides detailed error messages:
# Invalid node type
type: InvalidNodeType
WorkflowValidationError: Unknown node
# Missing required configuration
is required
NodeExecutionError: Node my_loader: source_directory
# Invalid connection
connect to TextSplitter nodes, not ChromaStoreNode
WorkflowValidationError: Loader node loader can only
# Type mismatch
-> storage.text (str)
WorkflowValidationError: Type mismatch: loader.documents (List[Document])
# Missing port
'data'. Available: ['documents'] WorkflowValidationError: Source node loader has no output port
Document Analysis Pipeline
Query existing storage, apply AI analysis, and export to spreadsheet:
nodes:
# Search for research documents
research_query:
type: QueryWhooshStore
config:
persist_location: "research_index"
query: "methodology results conclusions findings"
limit: 25
# Analyze each document with custom prompts
research_analysis:
type: PromptProcessor
config:
prompt: |
Analyze this research document and extract:
1. RESEARCH QUESTION: What is the main research question?
2. METHODOLOGY: What research methods were used?
3. KEY FINDINGS: What are the 3 most important findings?
4. LIMITATIONS: What limitations are mentioned?
5. CONFIDENCE: How confident are the conclusions (High/Medium/Low)?
Document: {source}
Content: {content}
Please format each answer on a separate line. model_name: "gpt-4"
batch_size: 3
# Export to Excel for review and analysis
analysis_report:
type: ExcelExporter
config:
output_path: "research_analysis_report.xlsx"
sheet_name: "Document_Analysis"
connections:
- from: research_query
from_port: documents
to: research_analysis
to_port: documents
- from: research_analysis
from_port: results
to: analysis_report
to_port: results
Document Topic Aggregation Pipeline
Extract individual document topics and create an aggregated analysis:
nodes:
# Load documents from directory
document_loader:
type: LoadFromFolder
config:
source_directory: "../sample_data"
include_patterns: ["*.pdf", "*.txt", "*.html"]
pdf_markdown: true
store_file_dates: true
# Keep documents whole for comprehensive analysis
full_document_processor:
type: KeepFullDocument
config:
concatenate_pages: true
# Extract topic keyword from each document
document_analyzer:
type: PromptProcessor
config:
prompt: |
Provide a single short keyword or keyphrase that captures the topic of the following text from {source} (only output the keyphrase without quotes and nothing else): {content} llm:
model_url: "openai://gpt-4o-mini"
temperature: 0.2
mute_stream: true
batch_size: 2
# Clean up responses for consistency
response_cleaner:
type: ResponseCleaner
config:
cleanup_prompt: |
Clean up this analysis response to ensure consistent formatting:
{original_response}
Requirements:
- Ensure the response is a single short keyword or keyphrase that captures the topic and nothing else. llm:
model_url: "openai://gpt-4o-mini"
temperature: 0
mute_stream: true
# Aggregate all topics into comprehensive analysis
topic_aggregator:
type: AggregatorNode
config:
prompt: |
Analyze these {num_results} topic keywords from different documents and provide a comprehensive topic analysis:
{responses}
Please provide your analysis in the following structure:
1. TOP TOPICS: List the 5 most frequently mentioned topics with their frequency counts
2. TOPIC CATEGORIES: Group related topics into broader categories (e.g., Technology, Business, etc.)
3. COVERAGE ANALYSIS: What percentage of documents cover each major theme?
4. INSIGHTS: What patterns or trends do you observe across the document collection?
5. EXECUTIVE SUMMARY: One paragraph summarizing the overall thematic landscape
Format your response with clear headings and bullet points for easy reading. llm:
model_url: "openai://gpt-4o-mini"
temperature: 0.3
max_tokens: 1000
mute_stream: true
# Export individual document topics to CSV
individual_topics_export:
type: CSVExporter
config:
output_path: "document_analysis_summary.csv"
# Export aggregated analysis to JSON
aggregated_export:
type: JSONExporter
config:
output_path: "document_topic_analysis.json"
pretty_print: true
connections:
# Pipeline: Load β Process β Analyze β Clean β Export Both Ways
- from: document_loader
from_port: documents
to: full_document_processor
to_port: documents
- from: full_document_processor
from_port: documents
to: document_analyzer
to_port: documents
- from: document_analyzer
from_port: results
to: response_cleaner
to_port: results
# Export individual document topics to CSV
- from: response_cleaner
from_port: results
to: individual_topics_export
to_port: results
# Aggregate topics for comprehensive analysis
- from: response_cleaner
from_port: results
to: topic_aggregator
to_port: results
# Export aggregated analysis to JSON (note: result β result port)
- from: topic_aggregator
from_port: result
to: aggregated_export
to_port: result
This example demonstrates the power of aggregation workflows:
Benefits: - Individual Analysis: CSV file with topic keyword for each document - Collection Insights: JSON file with aggregated analysis of all topics - Dual Output: Both granular and summary views of your document collection - Executive Summary: High-level insights perfect for reporting and decision-making
Output Files: - document_analysis_summary.csv
- Individual document topics (one row per document) - document_topic_analysis.json
- Aggregated topic analysis with patterns and insights
Key Pattern: Documents β Individual Analysis β Split β (CSV Export + Aggregation β JSON Export)
Multi-Format Export Pipeline
Process documents and export results in multiple formats:
nodes:
# Query for AI/ML papers
ai_papers:
type: QueryChromaStore
config:
persist_location: "vector_db"
query: "artificial intelligence machine learning neural networks"
limit: 15
# Generate structured summaries
paper_summaries:
type: SummaryProcessor
config:
max_length: 200
model_name: "gpt-3.5-turbo"
# Export to CSV for spreadsheet analysis
csv_export:
type: CSVExporter
config:
output_path: "ai_paper_summaries.csv"
columns: ["document_id", "source", "summary", "original_length"]
# Export to JSON for programmatic access
json_export:
type: JSONExporter
config:
output_path: "ai_paper_summaries.json"
pretty_print: true
# Export to Excel for presentation
excel_export:
type: ExcelExporter
config:
output_path: "ai_paper_report.xlsx"
sheet_name: "AI_ML_Summaries"
connections:
# Single source, multiple outputs
- from: ai_papers
from_port: documents
to: paper_summaries
to_port: documents
- from: paper_summaries
from_port: results
to: csv_export
to_port: results
- from: paper_summaries
from_port: results
to: json_export
to_port: results
- from: paper_summaries
from_port: results
to: excel_export
to_port: results
Best Practices
1. Workflow Organization
# Use descriptive node names
nodes:
legal_document_loader: # Not: loader1
type: LoadFromFolder
config:
source_directory: "legal_docs/"
contract_chunker: # Not: splitter1
type: SplitByParagraph
config:
chunk_size: 1500
contract_search_index: # Not: storage1
type: WhooshStore
config:
persist_location: "contract_index"
2. Configuration Management
Use environment variables for paths and sensitive data:
nodes:
loader:
type: LoadFromFolder
config:
source_directory: "${DOCUMENT_PATH:-./documents}" # Default fallback
es_store:
type: ElasticsearchStore
config:
persist_location: "${ELASTICSEARCH_URL}"
basic_auth: ["${ES_USERNAME}", "${ES_PASSWORD}"]
3. Chunk Size Guidelines
Choose appropriate chunk sizes for your use case:
# Small chunks (200-400 chars) - Good for:
# - Precise retrieval
# - Question answering
# - Fine-grained search
# Medium chunks (500-1000 chars) - Good for:
# - General purpose RAG
# - Balanced context/precision
# - Most common use case
# Large chunks (1000-2000 chars) - Good for:
# - Document summarization
# - Context-heavy tasks
# - Preserving document structure
4. Performance Optimization
nodes:
bulk_loader:
type: LoadFromFolder
config:
source_directory: "large_corpus/"
n_proc: 8 # Parallel processing
verbose: false # Reduce logging overhead
batch_size: 500 # Process in batches
efficient_chunker:
type: SplitByCharacterCount
config:
chunk_size: 500
chunk_overlap: 25 # Reduce overlap for speed
5. Metadata Preservation
nodes:
rich_loader:
type: LoadFromFolder
config:
source_directory: "documents/"
store_md5: true # Document integrity
store_mimetype: true # File type tracking
store_file_dates: true # Temporal information
extract_document_titles: true # Content-aware metadata
infer_table_structure: true # Preserve structure
Troubleshooting
Common Issues
1. βModule not foundβ errors
# Ensure you're in the correct directory
cd /path/to/onprem/project
# Or add to Python path
export PYTHONPATH=/path/to/onprem:$PYTHONPATH
2. βFile not foundβ errors
# Use absolute paths for reliability
nodes:
loader:
type: LoadFromFolder
config:
source_directory: "/full/path/to/documents" # Not: "documents/"
3. Memory issues with large documents
# Process in smaller batches
nodes:
loader:
type: LoadFromFolder
config:
source_directory: "large_docs/"
batch_size: 100 # Reduce batch size
n_proc: 2 # Reduce parallelism
chunker:
type: SplitByCharacterCount
config:
chunk_size: 300 # Smaller chunks
chunk_overlap: 30 # Less overlap
4. Storage connection issues
# Test connectivity first
nodes:
es_store:
type: ElasticsearchStore
config:
persist_location: "http://localhost:9200"
# Add authentication if needed
basic_auth: ["username", "password"]
# Increase timeouts if needed
timeout: 30
Document Length Management
Control document size for LLM processing and cost optimization:
nodes:
# Load large documents
document_loader:
type: LoadFromFolder
config:
source_directory: "large_documents/"
# Keep full documents but truncate to manageable size
size_control:
type: KeepFullDocument
config:
concatenate_pages: true # First combine multi-page documents
max_words: 2000 # Then truncate to first 2000 words
# Analyze the controlled-size documents
quick_analysis:
type: PromptProcessor
config:
prompt: |
Analyze this document excerpt (first 2000 words):
Source: {source}
Content: {content}
Provide:
1. Document type and purpose
2. Main topics covered
3. Key findings or conclusions
4. Whether this appears to be the beginning, middle, or complete document llm:
model_url: "openai://gpt-4o-mini"
temperature: 0.1
max_tokens: 500
connections:
- from: document_loader
from_port: documents
to: size_control
to_port: documents
- from: size_control
from_port: documents
to: quick_analysis
to_port: documents
Benefits: - Cost Control - Process only document beginnings instead of entire files - Speed - Faster analysis with consistent processing times - Context Management - Ensure documents fit within LLM context windows - Preview Analysis - Get quick insights from document openings - Metadata Tracking - Know original document size and truncation status
Debugging Workflows
Enable verbose output to see detailed execution:
from onprem.workflow import execute_workflow
# Detailed logging
= execute_workflow("workflow.yaml", verbose=True)
results
# Check results
for node_id, result in results.items():
print(f"Node {node_id}: {result}")
Validate before execution:
from onprem.workflow import WorkflowEngine
= WorkflowEngine()
engine try:
"workflow.yaml")
engine.load_workflow_from_yaml(print("β Workflow validation passed")
except Exception as e:
print(f"β Validation failed: {e}")
Performance Monitoring
Track processing times and document counts:
import time
from onprem.workflow import execute_workflow
= time.time()
start_time = execute_workflow("workflow.yaml", verbose=True)
results = time.time()
end_time
print(f"Processing time: {end_time - start_time:.2f} seconds")
# Count processed documents
= 0
total_docs for node_id, result in results.items():
if 'documents' in result:
= len(result['documents'])
count print(f"{node_id}: {count} documents")
+= count
total_docs
print(f"Total documents processed: {total_docs}")
This tutorial covers all aspects of the OnPrem workflow engine.