Workflow Engine - YAML Authoring
Workflow Engine v3 - End User Guide
Section titled “Workflow Engine v3 - End User Guide”Complete Guide to YAML Pipeline Authoring
Section titled “Complete Guide to YAML Pipeline Authoring”Table of Contents
Section titled “Table of Contents”- Introduction
- Getting Started
- YAML Structure
- Step Types
- Built-in Nodes
- Variables and References
- Execution Strategies
- Error Handling
- Advanced Patterns
- Best Practices
- Complete Examples
- Troubleshooting
Introduction
Section titled “Introduction”The Workflow Engine v3 is a powerful, distributed workflow orchestration system that allows you to define complex data pipelines and automation workflows using simple YAML files. This guide will teach you everything you need to know to author effective workflows.
Key Concepts
Section titled “Key Concepts”- Workflow: A complete automation pipeline defined in YAML
- Step: An individual unit of work within a workflow
- Node: A reusable component that performs specific operations (HTTP calls, data transformation, etc.)
- Context: The runtime state containing variables, inputs, and results
- Execution Strategy: How steps are executed (sequentially, in parallel, with retries, etc.)
Why YAML?
Section titled “Why YAML?”YAML provides a human-readable, version-controllable format for defining workflows. Your workflows can be:
- Stored in Git for version control
- Reviewed in pull requests
- Shared across teams
- Generated programmatically
- Validated before execution
Getting Started
Section titled “Getting Started”Your First Workflow
Section titled “Your First Workflow”name: 'My First Workflow'version: '1.0.0'description: 'A simple workflow to get started'
steps: - stepType: 'node' type: 'log' params: message: 'Hello, Workflow Engine!' level: 'info'This workflow has:
- A descriptive name and version
- A single step that logs a message
Running a Workflow
Section titled “Running a Workflow”Workflows can be triggered via:
- REST API:
POST /workflows/{workflowId}/execute - CLI:
workflow-cli run my-workflow.yaml - UI: Drag and drop in the visual builder
- Schedule: Cron-based triggers
- Events: Webhooks or message queues
YAML Structure
Section titled “YAML Structure”Complete Workflow Schema
Section titled “Complete Workflow Schema”# Required fieldsname: string # Unique workflow nameversion: string # Semantic version (e.g., "1.0.0")schemaVersion: '3.0' # Engine schema version
# Optional metadatadescription: string # Human-readable descriptiontags: [string] # Tags for organizationmetadata: # Custom metadata author: string team: string priority: string
# Versioning and compatibilitycompatibility: minEngineVersion: string # Minimum engine version required maxEngineVersion: string # Maximum engine version supported features: [string] # Required engine features
# Input/Output schemasinputSchema: # Validate workflow inputs type: 'object' properties: userId: { type: 'string' } required: ['userId']
outputSchema: # Validate workflow outputs type: 'object' properties: result: { type: 'object' }
# Workflow parameters (defaults)params: timeout: 30000 retryCount: 3 environment: 'production'
# Reusable node definitionsnodes: apiCall: # Define once, use multiple times type: 'http' params: method: 'GET' timeout: 5000
dataProcessor: # Complex node with validation type: 'transform' inputSchema: type: 'object' required: ['data'] params: mapping: processed: true
# Execution configurationconfig: timeout: 300000 # Global timeout (5 minutes) maxRetries: 3 # Default retry count checkpointInterval: 5000 # Save state every 5 seconds enableProfiling: true # Performance profiling enableTracing: true # Execution tracing
# Workflow steps (required)steps: [] # Array of steps
# Output mappingoutputs: finalResult: '$lastStep' # Map step results to outputs processedData: '$transform.data'Step Types
Section titled “Step Types”1. Node Step
Section titled “1. Node Step”Execute a single node operation.
steps: - stepType: 'node' id: 'fetch_data' # Optional unique identifier name: 'Fetch User Data' # Human-readable name description: 'Fetch user profile from API' type: 'http' # Node type to execute params: # Node-specific parameters url: 'https://api.example.com/user' method: 'GET'
# Execution control timeout: 5000 # Step timeout in milliseconds retries: 3 # Number of retry attempts
# Type validation inputSchema: # Validate input type: 'object' required: ['url'] outputSchema: # Validate output type: 'object' properties: status: { type: 'number' }
# Flow control onSuccess: 'next_step' # Go to specific step on success onError: 'error_handler' # Go to specific step on error onTimeout: 'timeout_handler'
# Conditional execution condition: '$input.runStep' # Only run if condition is true skip: '$input.skipThis' # Skip if true2. Reference Step
Section titled “2. Reference Step”Use a pre-defined node from the nodes section.
nodes: standardApiCall: type: 'http' params: method: 'POST' headers: Authorization: 'Bearer ${API_TOKEN}'
steps: - stepType: 'reference' id: 'call_api' ref: 'standardApiCall' # Reference node name params: # Override specific params url: 'https://api.example.com/endpoint' body: data: '$input.data'3. Switch/Case Step
Section titled “3. Switch/Case Step”Multi-branch conditional execution.
steps: - stepType: "switch" id: "route_by_type" expression: "$input.dataType" # Expression to evaluate cases: - value: "csv" steps: - stepType: "node" type: "csv-parse" params: data: "$input.data"
- value: "json" steps: - stepType: "node" type: "json-parse" params: data: "$input.data"
- value: "xml" steps: - stepType: "node" type: "xml-parse" params: data: "$input.data"
default: # Fallback if no match - stepType: "node" type: "log" params: message: "Unknown data type" level: "error"4. Loop Step
Section titled “4. Loop Step”Iterate over data or conditions.
forEach Loop
Section titled “forEach Loop”steps: - stepType: 'loop' id: 'process_items' loopType: 'forEach' items: '$input.users' # Array to iterate over steps: - stepType: 'node' type: 'http' params: url: 'https://api.example.com/user/${var.$item.id}' method: 'GET'
- stepType: 'node' type: 'transform' params: input: '$var.$item' # Access current item index: '$var.$index' # Access current indexwhile Loop
Section titled “while Loop”steps: - stepType: 'loop' id: 'poll_status' loopType: 'while' condition: "$var.status != 'complete'" steps: - stepType: 'node' type: 'http' params: url: 'https://api.example.com/status'
- stepType: 'node' type: 'set' params: name: 'status' value: '$poll_status.data.status'
- stepType: 'node' type: 'delay' params: milliseconds: 5000for Loop
Section titled “for Loop”steps: - stepType: 'loop' id: 'generate_series' loopType: 'for' start: 0 end: 10 step: 2 # Increment by 2 steps: - stepType: 'node' type: 'math' params: operation: 'multiply' a: '$var.$index' # Current loop index b: 25. Try/Catch/Finally Step
Section titled “5. Try/Catch/Finally Step”Error handling with cleanup.
steps: - stepType: 'try' id: 'safe_operation' try: - stepType: 'node' type: 'http' params: url: 'https://unreliable-api.example.com'
- stepType: 'node' type: 'transform' params: data: '$safe_operation'
catch: - stepType: 'node' type: 'log' params: message: 'Operation failed' error: '$var.$error' # Access caught error level: 'error'
- stepType: 'node' type: 'set' params: name: 'fallbackData' value: { default: true }
finally: # Always executes - stepType: 'node' type: 'log' params: message: 'Cleanup complete'6. Parallel Step
Section titled “6. Parallel Step”Execute multiple steps concurrently.
steps: - stepType: 'parallel' id: 'fetch_all_data' strategy: maxConcurrency: 5 # Max parallel executions waitAll: true # Wait for all to complete continueOnError: false # Stop on first error
steps: - stepType: 'node' id: 'fetch_users' type: 'http' params: url: 'https://api.example.com/users'
- stepType: 'node' id: 'fetch_orders' type: 'http' params: url: 'https://api.example.com/orders'
- stepType: 'node' id: 'fetch_products' type: 'http' params: url: 'https://api.example.com/products'7. Sequence Step
Section titled “7. Sequence Step”Group steps for organization.
steps: - stepType: 'sequence' id: 'user_onboarding' name: 'User Onboarding Process' steps: - stepType: 'node' type: 'create-account' params: user: '$input.user'
- stepType: 'node' type: 'send-email' params: template: 'welcome'
- stepType: 'node' type: 'setup-defaults' params: userId: '$create-account.userId'8. Dynamic Step
Section titled “8. Dynamic Step”Generate steps at runtime.
steps: # Get list of tasks - stepType: 'node' id: 'get_tasks' type: 'http' params: url: 'https://api.example.com/tasks'
# Generate processing steps dynamically - stepType: 'dynamic' id: 'process_dynamic' generator: | ${ $get_tasks.data.map(task => ({ stepType: "node", type: "process-task", params: { taskId: task.id, priority: task.priority } })) }9. Workflow Step
Section titled “9. Workflow Step”Execute another workflow as a step.
steps: - stepType: 'workflow' id: 'run_subflow' workflowId: 'data-processing-pipeline' inputs: # Pass inputs to sub-workflow data: '$input.rawData' config: '$var.processingConfig'Built-in Nodes
Section titled “Built-in Nodes”Core Nodes
Section titled “Core Nodes”Output messages for debugging and monitoring.
type: 'log'params: message: string # Message to log level: 'info|warn|error|debug' # Log level data: any # Additional data to logset / setVariable
Section titled “set / setVariable”Store values in workflow variables.
type: 'set'params: name: string # Variable name value: any # Value to storedelay / wait
Section titled “delay / wait”Pause execution for specified time.
type: 'delay'params: milliseconds: number # Delay in millisecondserror / throw
Section titled “error / throw”Trigger an error for testing error handling.
type: 'error'params: message: string # Error message code: string # Error codeNetwork Nodes
Section titled “Network Nodes”Make HTTP requests.
type: 'http'params: url: string # Request URL method: 'GET|POST|PUT|DELETE|PATCH' headers: object # Request headers params: object # Query parameters body: any # Request body timeout: number # Timeout in milliseconds auth: # Authentication type: 'basic|bearer' credentials: stringwebsocket / ws
Section titled “websocket / ws”WebSocket connections.
type: 'websocket'params: url: string # WebSocket URL message: any # Message to send timeout: number # Connection timeout keepAlive: boolean # Keep connection aliveServer-Sent Events.
type: 'sse'params: url: string # SSE endpoint URL timeout: number # Listen timeout headers: object # Request headersData Processing Nodes
Section titled “Data Processing Nodes”parse / parser
Section titled “parse / parser”Parse different data formats.
type: 'parse'params: format: 'json|yaml|csv|xml' # Data format data: string|any # Data to parse path: string # Extract specific path options: # Format-specific options delimiter: ',' # CSV delimiter headers: boolean # CSV has headerstransform / transformer
Section titled “transform / transformer”Transform data structure.
type: 'transform'params: input: any # Input data mapping: object # Transformation mapping template: string # String template
# Mapping examplemapping: newField: '$.oldField' # Path extraction computed: '${$.value * 2}' # Computed value nested: field: '$.deep.path'
# Template exampletemplate: 'Hello {{name}}, your score is {{score}}'Mathematical operations.
type: 'math'params: operation: string # Operation type a: number # First operand b: number # Second operand value: number # Single value operations
# Operations:# add, subtract, multiply, divide, power, sqrt, abs,# round, floor, ceil, min, max, randomcondition / if
Section titled “condition / if”Evaluate conditions.
type: 'condition'params: left: any # Left operand operator: string # Comparison operator right: any # Right operand returnValue: boolean # Return boolean vs action
# Operators:# ==, ===, !=, !==, >, >=, <, <=,# in, not_in, contains, starts_with, ends_with,# matches (regex), exists, not_existsPython Data Science Nodes
Section titled “Python Data Science Nodes”pandas
Section titled “pandas”Pandas DataFrame operations.
type: "pandas"params: data: array|object # Input data operation: string # Pandas operation # Operation-specific params
# Operations:operation: "describe" # Statistical summaryoperation: "groupby" groupBy: "column" aggregate: "mean|sum|count"operation: "pivot" index: "column1" columns: "column2" values: "column3"operation: "merge" mergeWith: data on: "column" how: "inner|outer|left|right"Numerical computing.
type: "numpy"params: data: array # Input array operation: string # NumPy operation
# Operations:operation: "mean|std|median" # Statistical functionsoperation: "fft" # Fast Fourier Transformoperation: "correlate" # Cross-correlationoperation: "reshape" shape: [2, 3]scikit
Section titled “scikit”Machine learning operations.
type: "scikit"params: data: array # Input data operation: string # Scikit operation
# Operations:operation: "scale" # StandardScaleroperation: "normalize" # Normalize dataoperation: "pca" # Principal Component Analysis components: 2operation: "cluster" # K-means clustering clusters: 3tensorflow
Section titled “tensorflow”Deep learning operations.
type: 'tensorflow'params: operation: 'train|predict|evaluate' modelPath: string # Path to saved model data: array # Input data modelConfig: # For training layers: array compile: object epochs: numberVariables and References
Section titled “Variables and References”Reference Syntax
Section titled “Reference Syntax”# Step results$stepId # Result of step with id "stepId"$stepId.field # Access field in result$stepId.nested.deep.field # Nested field access$stepId[0] # Array index access$stepId[0].field # Combine array and field access
# Input parameters$input # All input parameters$input.userId # Specific input field
# Variables$var # All variables$var.myVariable # Specific variable
# Output mapping$output # Current outputs$output.result # Specific output
# Special variables (in loops)$var.$item # Current item in forEach$var.$index # Current index in loops$var.$error # Caught error in catch block
# Environment variables$env.API_KEY # Environment variableVariable Scope
Section titled “Variable Scope”Variables have different scopes:
- Global: Available throughout workflow
- Step: Available within a step
- Loop: Available within loop iterations
- Error: Available in catch blocks
Examples
Section titled “Examples”steps: # Set a variable - stepType: 'node' id: 'set_config' type: 'set' params: name: 'apiConfig' value: endpoint: 'https://api.example.com' timeout: 5000
# Use the variable - stepType: 'node' id: 'api_call' type: 'http' params: url: '$var.apiConfig.endpoint' timeout: '$var.apiConfig.timeout'
# Use step result - stepType: 'node' id: 'process' type: 'transform' params: input: '$api_call.data' mapping: userId: '$.user.id' userName: '$.user.name'
# Conditional reference - stepType: 'node' type: 'log' params: message: "${$process.userId ? 'User found' : 'User not found'}"Execution Strategies
Section titled “Execution Strategies”Sequential Execution (Default)
Section titled “Sequential Execution (Default)”Steps execute one after another.
steps: - stepType: 'node' type: 'step1' - stepType: 'node' type: 'step2' # Runs after step1 - stepType: 'node' type: 'step3' # Runs after step2Parallel Execution
Section titled “Parallel Execution”Execute multiple operations concurrently.
execution: type: 'parallel' maxConcurrency: 10 waitAll: true continueOnError: falseRetry Strategy
Section titled “Retry Strategy”Automatic retry with backoff.
execution: type: 'retry' maxAttempts: 3 backoff: 'exponential' # or "linear" delay: 1000 # Initial delay in msCircuit Breaker
Section titled “Circuit Breaker”Prevent cascading failures.
execution: type: 'circuitBreaker' threshold: 5 # Failures before opening timeout: 60000 # Reset timeout in ms halfOpenRequests: 3 # Test requests when half-openThrottling
Section titled “Throttling”Rate limit execution.
execution: type: 'throttle' limit: 10 # Max executions interval: 1000 # Per interval in msStream Processing
Section titled “Stream Processing”Process data in batches.
execution: type: 'stream' batchSize: 100 ordered: true # Maintain orderRace Condition
Section titled “Race Condition”First to complete wins.
execution: type: 'race' timeout: 5000 # Max wait timeError Handling
Section titled “Error Handling”Step-Level Error Handling
Section titled “Step-Level Error Handling”steps: - stepType: 'node' id: 'risky_operation' type: 'http' params: url: 'https://api.example.com'
# Retry configuration retries: 3 execution: type: 'retry' backoff: 'exponential' delay: 1000
# Error routing onError: 'error_handler' # Go to specific step
- stepType: 'node' id: 'error_handler' type: 'log' params: message: 'Operation failed, using fallback'Try/Catch Blocks
Section titled “Try/Catch Blocks”steps: - stepType: 'try' try: - stepType: 'node' type: 'risky_operation'
catch: - stepType: 'node' type: 'log' params: error: '$var.$error'
- stepType: 'node' type: 'send-alert' params: message: 'Workflow failed'
finally: - stepType: 'node' type: 'cleanup'Global Error Configuration
Section titled “Global Error Configuration”config: maxRetries: 3 onError: notify: true channels: ['email', 'slack'] continueOnError: falseAdvanced Patterns
Section titled “Advanced Patterns”Pattern 1: Data Pipeline with Validation
Section titled “Pattern 1: Data Pipeline with Validation”name: "Data Processing Pipeline"version: "1.0.0"
inputSchema: type: "object" properties: sourceUrl: { type: "string", format: "uri" } outputFormat: { type: "string", enum: ["json", "csv", "parquet"] } required: ["sourceUrl", "outputFormat"]
steps: # Fetch raw data - stepType: "node" id: "fetch" type: "http" params: url: "$input.sourceUrl" outputSchema: type: "object" required: ["data"]
# Validate data - stepType: "node" id: "validate" type: "condition" params: left: "$fetch.data" operator: "exists" onError: "validation_failed"
# Transform data in parallel - stepType: "parallel" id: "transform" steps: - stepType: "node" type: "pandas" params: data: "$fetch.data" operation: "clean"
- stepType: "node" type: "pandas" params: data: "$fetch.data" operation: "normalize"
# Export in requested format - stepType: "switch" expression: "$input.outputFormat" cases: - value: "json" steps: - stepType: "node" type: "export-json" - value: "csv" steps: - stepType: "node" type: "export-csv" - value: "parquet" steps: - stepType: "node" type: "export-parquet"Pattern 2: Event-Driven Processing
Section titled “Pattern 2: Event-Driven Processing”name: "Event Processor"version: "1.0.0"
steps: # Listen for events - stepType: "node" id: "listen" type: "sse" params: url: "https://events.example.com/stream" timeout: 60000
# Process each event - stepType: "loop" loopType: "forEach" items: "$listen.events" steps: - stepType: "switch" expression: "$var.$item.type" cases: - value: "user.created" steps: - stepType: "workflow" workflowId: "user-onboarding" inputs: user: "$var.$item.data"
- value: "order.placed" steps: - stepType: "workflow" workflowId: "order-processing" inputs: order: "$var.$item.data"Pattern 3: Conditional Parallel Processing
Section titled “Pattern 3: Conditional Parallel Processing”name: "Smart Parallel Processor"version: "1.0.0"
steps: # Determine processing strategy - stepType: "node" id: "analyze" type: "condition" params: left: "$input.dataSize" operator: ">" right: 1000
# Choose execution strategy - stepType: "switch" expression: "$analyze" cases: - value: true steps: # Large dataset - process in parallel batches - stepType: "parallel" strategy: maxConcurrency: 10 steps: - stepType: "loop" loopType: "forEach" items: "$input.dataBatch1" steps: - stepType: "node" type: "process"
- value: false steps: # Small dataset - process sequentially - stepType: "loop" loopType: "forEach" items: "$input.data" steps: - stepType: "node" type: "process"Pattern 4: Saga Pattern for Distributed Transactions
Section titled “Pattern 4: Saga Pattern for Distributed Transactions”name: 'Order Saga'version: '1.0.0'
steps: # Start transaction - stepType: 'node' id: 'start_transaction' type: 'set' params: name: 'transactionId' value: '${Date.now()}'
# Execute saga steps with compensations - stepType: 'try' try: # Reserve inventory - stepType: 'node' id: 'reserve_inventory' type: 'http' params: url: 'https://inventory.api/reserve' method: 'POST' body: items: '$input.items' transactionId: '$var.transactionId'
# Charge payment - stepType: 'node' id: 'charge_payment' type: 'http' params: url: 'https://payment.api/charge' method: 'POST' body: amount: '$input.total' transactionId: '$var.transactionId'
# Create shipment - stepType: 'node' id: 'create_shipment' type: 'http' params: url: 'https://shipping.api/create' method: 'POST' body: order: '$input.orderId' transactionId: '$var.transactionId'
catch: # Compensate in reverse order - stepType: 'parallel' continueOnError: true steps: - stepType: 'node' type: 'http' params: url: 'https://shipping.api/cancel' method: 'POST' body: transactionId: '$var.transactionId'
- stepType: 'node' type: 'http' params: url: 'https://payment.api/refund' method: 'POST' body: transactionId: '$var.transactionId'
- stepType: 'node' type: 'http' params: url: 'https://inventory.api/release' method: 'POST' body: transactionId: '$var.transactionId'Best Practices
Section titled “Best Practices”1. Workflow Design
Section titled “1. Workflow Design”DO:
- Keep workflows focused on a single responsibility
- Use descriptive names for steps and variables
- Add comments using the description field
- Version your workflows semantically
- Use node definitions for reusable components
DON’T:
- Create workflows with more than 50 steps
- Nest loops more than 3 levels deep
- Hard-code sensitive data (use environment variables)
- Ignore error handling
2. Performance Optimization
Section titled “2. Performance Optimization”# Good: Parallel independent operationssteps: - stepType: "parallel" steps: - stepType: "node" type: "fetch-users" - stepType: "node" type: "fetch-orders" - stepType: "node" type: "fetch-products"
# Bad: Sequential when could be parallelsteps: - stepType: "node" type: "fetch-users" - stepType: "node" type: "fetch-orders" - stepType: "node" type: "fetch-products"3. Error Handling
Section titled “3. Error Handling”# Good: Comprehensive error handlingsteps: - stepType: "try" try: - stepType: "node" type: "critical-operation" retries: 3 execution: type: "retry" backoff: "exponential" catch: - stepType: "node" type: "log-error" - stepType: "node" type: "send-alert" - stepType: "node" type: "use-fallback" finally: - stepType: "node" type: "cleanup"
# Bad: No error handlingsteps: - stepType: "node" type: "critical-operation"4. Resource Management
Section titled “4. Resource Management”# Good: Configure timeouts and limitsconfig: timeout: 300000 # 5 minute global timeout checkpointInterval: 10000 # Checkpoint every 10 seconds
steps: - stepType: "node" type: "http" timeout: 5000 # 5 second timeout params: url: "https://api.example.com"
# Bad: No resource limitssteps: - stepType: "node" type: "http" params: url: "https://slow-api.example.com"5. Testing Workflows
Section titled “5. Testing Workflows”# Use conditional execution for test modesteps: - stepType: "switch" expression: "$input.testMode" cases: - value: true steps: - stepType: "node" type: "mock-data" - value: false steps: - stepType: "node" type: "real-data"Complete Examples
Section titled “Complete Examples”Example 1: ETL Pipeline
Section titled “Example 1: ETL Pipeline”name: 'ETL Pipeline'version: '2.0.0'description: 'Extract, Transform, Load data pipeline'
params: sourceDatabase: 'postgresql://source-db' targetDatabase: 'postgresql://target-db' batchSize: 1000
steps: # Extract - stepType: 'node' id: 'extract' type: 'database-query' params: connection: '$input.sourceDatabase' query: 'SELECT * FROM users WHERE updated_at > $1' params: ['$input.lastSync'] execution: type: 'stream' batchSize: '$input.batchSize'
# Transform each batch - stepType: 'loop' id: 'transform_batches' loopType: 'forEach' items: '$extract.batches' steps: # Clean data - stepType: 'node' id: 'clean' type: 'pandas' params: data: '$var.$item' operation: 'clean' removeNull: true deduplication: true
# Enrich data - stepType: 'parallel' id: 'enrich' steps: - stepType: 'node' type: 'http' params: url: 'https://api.example.com/enrich' method: 'POST' body: '$clean'
- stepType: 'node' type: 'geocode' params: addresses: '$clean.addresses'
# Validate - stepType: 'node' id: 'validate' type: 'schema-validate' params: data: '$enrich' schema: type: 'object' required: ['id', 'email', 'name']
# Load - stepType: 'node' id: 'load' type: 'database-bulk-insert' params: connection: '$input.targetDatabase' table: 'users_transformed' data: '$transform_batches' onConflict: 'update'
# Update sync timestamp - stepType: 'node' type: 'set' params: name: 'lastSync' value: '${Date.now()}'
outputs: recordsProcessed: '$load.count' lastSyncTime: '$var.lastSync'Example 2: Multi-Stage ML Pipeline
Section titled “Example 2: Multi-Stage ML Pipeline”name: "ML Training Pipeline"version: "1.0.0"description: "Complete machine learning training pipeline"
inputSchema: type: "object" properties: datasetUrl: { type: "string" } modelType: { type: "string", enum: ["classification", "regression"] } targetColumn: { type: "string" } required: ["datasetUrl", "modelType", "targetColumn"]
steps: # Data Loading - stepType: "node" id: "load_data" type: "http" params: url: "$input.datasetUrl" method: "GET"
# Data Preprocessing - stepType: "sequence" id: "preprocessing" name: "Data Preprocessing" steps: # Parse CSV - stepType: "node" id: "parse" type: "parse" params: format: "csv" data: "$load_data.data"
# Split features and target - stepType: "node" id: "split" type: "pandas" params: data: "$parse" operation: "split" targetColumn: "$input.targetColumn"
# Handle missing values - stepType: "node" id: "impute" type: "scikit" params: data: "$split.features" operation: "impute" strategy: "mean"
# Scale features - stepType: "node" id: "scale" type: "scikit" params: data: "$impute" operation: "scale"
# Feature Engineering - stepType: "parallel" id: "feature_engineering" steps: # PCA - stepType: "node" id: "pca" type: "scikit" params: data: "$scale" operation: "pca" components: 10
# Feature selection - stepType: "node" id: "select_features" type: "scikit" params: data: "$scale" operation: "select_features" method: "mutual_info" k: 20
# Model Training - stepType: "switch" id: "train_model" expression: "$input.modelType" cases: - value: "classification" steps: - stepType: "parallel" id: "train_classifiers" steps: - stepType: "node" id: "random_forest" type: "scikit" params: operation: "train" algorithm: "RandomForest" features: "$pca" target: "$split.target"
- stepType: "node" id: "gradient_boost" type: "scikit" params: operation: "train" algorithm: "GradientBoosting" features: "$pca" target: "$split.target"
- stepType: "node" id: "neural_net" type: "tensorflow" params: operation: "train" modelConfig: layers: - { type: "Dense", units: 128, activation: "relu" } - { type: "Dropout", rate: 0.3 } - { type: "Dense", units: 64, activation: "relu" } - { type: "Dense", units: 1, activation: "sigmoid" } epochs: 50
- value: "regression" steps: - stepType: "node" type: "scikit" params: operation: "train" algorithm: "LinearRegression"
# Model Evaluation - stepType: "node" id: "evaluate" type: "scikit" params: operation: "evaluate" models: "$train_model" testData: "$scale" metrics: ["accuracy", "precision", "recall", "f1"]
# Select Best Model - stepType: "node" id: "select_best" type: "condition" params: models: "$evaluate.results" criterion: "accuracy" operation: "max"
# Save Model - stepType: "node" id: "save_model" type: "model-store" params: model: "$select_best.model" metadata: accuracy: "$select_best.accuracy" features: "$select_features.selected" preprocessing: "$scale.scaler"
outputs: modelId: "$save_model.modelId" accuracy: "$select_best.accuracy" metrics: "$evaluate.results"Troubleshooting
Section titled “Troubleshooting”Common Issues and Solutions
Section titled “Common Issues and Solutions”Issue: Step Not Executing
Section titled “Issue: Step Not Executing”Symptoms: Step is skipped without error
Possible Causes:
- Condition evaluates to false
- Skip parameter is true
- Previous step failed
Solution:
steps: - stepType: 'node' type: 'debug' params: check: condition: '$myCondition' skip: '$shouldSkip' previousResult: '$previousStep'Issue: Reference Not Found
Section titled “Issue: Reference Not Found”Symptoms: Error: “Cannot read property of undefined”
Possible Causes:
- Typo in reference path
- Step hasn’t executed yet
- Step failed and has no result
Solution:
# Use optional chainingparams: value: "${$stepId?.data?.field || 'default'}"
# Or check existence- stepType: "condition" params: left: "$stepId.data" operator: "exists"Issue: Timeout Errors
Section titled “Issue: Timeout Errors”Symptoms: Workflow fails with timeout
Solution:
# Increase timeouts at multiple levelsconfig: timeout: 600000 # 10 minutes global
steps: - stepType: 'node' timeout: 30000 # 30 seconds for step type: 'http' params: timeout: 10000 # 10 seconds for HTTPIssue: Memory/Performance Issues
Section titled “Issue: Memory/Performance Issues”Symptoms: Workflow slow or crashes
Solution:
# Use streaming and batchingsteps: - stepType: 'node' type: 'data-process' execution: type: 'stream' batchSize: 100 params: streaming: trueDebug Techniques
Section titled “Debug Techniques”1. Enable Tracing
Section titled “1. Enable Tracing”config: enableTracing: true enableProfiling: true2. Add Debug Logs
Section titled “2. Add Debug Logs”steps: - stepType: 'node' type: 'log' params: message: 'Debug: Current state' data: input: '$input' variables: '$var' lastResult: '$previousStep'3. Use Test Mode
Section titled “3. Use Test Mode”params: testMode: true mockData: true
steps: - stepType: 'switch' expression: '$input.testMode' cases: - value: true steps: - stepType: 'node' type: 'mock-response'Getting Help
Section titled “Getting Help”- Check Logs: Review execution logs in the UI or API
- Trace Execution: Use execution traces to see step-by-step flow
- Validate YAML: Use the validation endpoint or CLI tool
- Community: Visit our GitHub discussions
- Support: Contact support with execution ID
Appendix
Section titled “Appendix”Reserved Keywords
Section titled “Reserved Keywords”These words have special meaning and shouldn’t be used as step IDs:
- input, output, var, env
- item, index, error
- true, false, null
Supported Data Types
Section titled “Supported Data Types”- String:
"text" - Number:
123,45.67 - Boolean:
true,false - Array:
[1, 2, 3] - Object:
{ key: "value" } - Null:
null
Expression Language
Section titled “Expression Language”Simple JavaScript expressions are supported:
"${1 + 1}" # Math: 2"${$var.count > 10}" # Comparison: true/false"${$input.name || 'Unknown'}" # Default values"${['a','b'].includes($var.x)}" # Array operationsEnvironment Variables
Section titled “Environment Variables”Access environment variables:
params: apiKey: '$env.API_KEY' environment: '$env.NODE_ENV'Note: Environment variables must be configured on the server.
This completes the End User Guide for YAML Pipeline Authoring. For developer documentation on extending and integrating the engine, see the Developer Guide.