Skip to content

Workflow Engine - YAML Authoring


  1. Introduction
  2. Getting Started
  3. YAML Structure
  4. Step Types
  5. Built-in Nodes
  6. Variables and References
  7. Execution Strategies
  8. Error Handling
  9. Advanced Patterns
  10. Best Practices
  11. Complete Examples
  12. Troubleshooting

The Workflow Engine v3 is a powerful, distributed workflow orchestration system that allows you to define complex data pipelines and automation workflows using simple YAML files. This guide will teach you everything you need to know to author effective workflows.

  • Workflow: A complete automation pipeline defined in YAML
  • Step: An individual unit of work within a workflow
  • Node: A reusable component that performs specific operations (HTTP calls, data transformation, etc.)
  • Context: The runtime state containing variables, inputs, and results
  • Execution Strategy: How steps are executed (sequentially, in parallel, with retries, etc.)

YAML provides a human-readable, version-controllable format for defining workflows. Your workflows can be:

  • Stored in Git for version control
  • Reviewed in pull requests
  • Shared across teams
  • Generated programmatically
  • Validated before execution

name: 'My First Workflow'
version: '1.0.0'
description: 'A simple workflow to get started'
steps:
- stepType: 'node'
type: 'log'
params:
message: 'Hello, Workflow Engine!'
level: 'info'

This workflow has:

  • A descriptive name and version
  • A single step that logs a message

Workflows can be triggered via:

  • REST API: POST /workflows/{workflowId}/execute
  • CLI: workflow-cli run my-workflow.yaml
  • UI: Drag and drop in the visual builder
  • Schedule: Cron-based triggers
  • Events: Webhooks or message queues

# Required fields
name: string # Unique workflow name
version: string # Semantic version (e.g., "1.0.0")
schemaVersion: '3.0' # Engine schema version
# Optional metadata
description: string # Human-readable description
tags: [string] # Tags for organization
metadata: # Custom metadata
author: string
team: string
priority: string
# Versioning and compatibility
compatibility:
minEngineVersion: string # Minimum engine version required
maxEngineVersion: string # Maximum engine version supported
features: [string] # Required engine features
# Input/Output schemas
inputSchema: # Validate workflow inputs
type: 'object'
properties:
userId: { type: 'string' }
required: ['userId']
outputSchema: # Validate workflow outputs
type: 'object'
properties:
result: { type: 'object' }
# Workflow parameters (defaults)
params:
timeout: 30000
retryCount: 3
environment: 'production'
# Reusable node definitions
nodes:
apiCall: # Define once, use multiple times
type: 'http'
params:
method: 'GET'
timeout: 5000
dataProcessor: # Complex node with validation
type: 'transform'
inputSchema:
type: 'object'
required: ['data']
params:
mapping:
processed: true
# Execution configuration
config:
timeout: 300000 # Global timeout (5 minutes)
maxRetries: 3 # Default retry count
checkpointInterval: 5000 # Save state every 5 seconds
enableProfiling: true # Performance profiling
enableTracing: true # Execution tracing
# Workflow steps (required)
steps: [] # Array of steps
# Output mapping
outputs:
finalResult: '$lastStep' # Map step results to outputs
processedData: '$transform.data'

Execute a single node operation.

steps:
- stepType: 'node'
id: 'fetch_data' # Optional unique identifier
name: 'Fetch User Data' # Human-readable name
description: 'Fetch user profile from API'
type: 'http' # Node type to execute
params: # Node-specific parameters
url: 'https://api.example.com/user'
method: 'GET'
# Execution control
timeout: 5000 # Step timeout in milliseconds
retries: 3 # Number of retry attempts
# Type validation
inputSchema: # Validate input
type: 'object'
required: ['url']
outputSchema: # Validate output
type: 'object'
properties:
status: { type: 'number' }
# Flow control
onSuccess: 'next_step' # Go to specific step on success
onError: 'error_handler' # Go to specific step on error
onTimeout: 'timeout_handler'
# Conditional execution
condition: '$input.runStep' # Only run if condition is true
skip: '$input.skipThis' # Skip if true

Use a pre-defined node from the nodes section.

nodes:
standardApiCall:
type: 'http'
params:
method: 'POST'
headers:
Authorization: 'Bearer ${API_TOKEN}'
steps:
- stepType: 'reference'
id: 'call_api'
ref: 'standardApiCall' # Reference node name
params: # Override specific params
url: 'https://api.example.com/endpoint'
body:
data: '$input.data'

Multi-branch conditional execution.

steps:
- stepType: "switch"
id: "route_by_type"
expression: "$input.dataType" # Expression to evaluate
cases:
- value: "csv"
steps:
- stepType: "node"
type: "csv-parse"
params:
data: "$input.data"
- value: "json"
steps:
- stepType: "node"
type: "json-parse"
params:
data: "$input.data"
- value: "xml"
steps:
- stepType: "node"
type: "xml-parse"
params:
data: "$input.data"
default: # Fallback if no match
- stepType: "node"
type: "log"
params:
message: "Unknown data type"
level: "error"

Iterate over data or conditions.

steps:
- stepType: 'loop'
id: 'process_items'
loopType: 'forEach'
items: '$input.users' # Array to iterate over
steps:
- stepType: 'node'
type: 'http'
params:
url: 'https://api.example.com/user/${var.$item.id}'
method: 'GET'
- stepType: 'node'
type: 'transform'
params:
input: '$var.$item' # Access current item
index: '$var.$index' # Access current index
steps:
- stepType: 'loop'
id: 'poll_status'
loopType: 'while'
condition: "$var.status != 'complete'"
steps:
- stepType: 'node'
type: 'http'
params:
url: 'https://api.example.com/status'
- stepType: 'node'
type: 'set'
params:
name: 'status'
value: '$poll_status.data.status'
- stepType: 'node'
type: 'delay'
params:
milliseconds: 5000
steps:
- stepType: 'loop'
id: 'generate_series'
loopType: 'for'
start: 0
end: 10
step: 2 # Increment by 2
steps:
- stepType: 'node'
type: 'math'
params:
operation: 'multiply'
a: '$var.$index' # Current loop index
b: 2

Error handling with cleanup.

steps:
- stepType: 'try'
id: 'safe_operation'
try:
- stepType: 'node'
type: 'http'
params:
url: 'https://unreliable-api.example.com'
- stepType: 'node'
type: 'transform'
params:
data: '$safe_operation'
catch:
- stepType: 'node'
type: 'log'
params:
message: 'Operation failed'
error: '$var.$error' # Access caught error
level: 'error'
- stepType: 'node'
type: 'set'
params:
name: 'fallbackData'
value: { default: true }
finally: # Always executes
- stepType: 'node'
type: 'log'
params:
message: 'Cleanup complete'

Execute multiple steps concurrently.

steps:
- stepType: 'parallel'
id: 'fetch_all_data'
strategy:
maxConcurrency: 5 # Max parallel executions
waitAll: true # Wait for all to complete
continueOnError: false # Stop on first error
steps:
- stepType: 'node'
id: 'fetch_users'
type: 'http'
params:
url: 'https://api.example.com/users'
- stepType: 'node'
id: 'fetch_orders'
type: 'http'
params:
url: 'https://api.example.com/orders'
- stepType: 'node'
id: 'fetch_products'
type: 'http'
params:
url: 'https://api.example.com/products'

Group steps for organization.

steps:
- stepType: 'sequence'
id: 'user_onboarding'
name: 'User Onboarding Process'
steps:
- stepType: 'node'
type: 'create-account'
params:
user: '$input.user'
- stepType: 'node'
type: 'send-email'
params:
template: 'welcome'
- stepType: 'node'
type: 'setup-defaults'
params:
userId: '$create-account.userId'

Generate steps at runtime.

steps:
# Get list of tasks
- stepType: 'node'
id: 'get_tasks'
type: 'http'
params:
url: 'https://api.example.com/tasks'
# Generate processing steps dynamically
- stepType: 'dynamic'
id: 'process_dynamic'
generator: |
${
$get_tasks.data.map(task => ({
stepType: "node",
type: "process-task",
params: {
taskId: task.id,
priority: task.priority
}
}))
}

Execute another workflow as a step.

steps:
- stepType: 'workflow'
id: 'run_subflow'
workflowId: 'data-processing-pipeline'
inputs: # Pass inputs to sub-workflow
data: '$input.rawData'
config: '$var.processingConfig'

Output messages for debugging and monitoring.

type: 'log'
params:
message: string # Message to log
level: 'info|warn|error|debug' # Log level
data: any # Additional data to log

Store values in workflow variables.

type: 'set'
params:
name: string # Variable name
value: any # Value to store

Pause execution for specified time.

type: 'delay'
params:
milliseconds: number # Delay in milliseconds

Trigger an error for testing error handling.

type: 'error'
params:
message: string # Error message
code: string # Error code

Make HTTP requests.

type: 'http'
params:
url: string # Request URL
method: 'GET|POST|PUT|DELETE|PATCH'
headers: object # Request headers
params: object # Query parameters
body: any # Request body
timeout: number # Timeout in milliseconds
auth: # Authentication
type: 'basic|bearer'
credentials: string

WebSocket connections.

type: 'websocket'
params:
url: string # WebSocket URL
message: any # Message to send
timeout: number # Connection timeout
keepAlive: boolean # Keep connection alive

Server-Sent Events.

type: 'sse'
params:
url: string # SSE endpoint URL
timeout: number # Listen timeout
headers: object # Request headers

Parse different data formats.

type: 'parse'
params:
format: 'json|yaml|csv|xml' # Data format
data: string|any # Data to parse
path: string # Extract specific path
options: # Format-specific options
delimiter: ',' # CSV delimiter
headers: boolean # CSV has headers

Transform data structure.

type: 'transform'
params:
input: any # Input data
mapping: object # Transformation mapping
template: string # String template
# Mapping example
mapping:
newField: '$.oldField' # Path extraction
computed: '${$.value * 2}' # Computed value
nested:
field: '$.deep.path'
# Template example
template: 'Hello {{name}}, your score is {{score}}'

Mathematical operations.

type: 'math'
params:
operation: string # Operation type
a: number # First operand
b: number # Second operand
value: number # Single value operations
# Operations:
# add, subtract, multiply, divide, power, sqrt, abs,
# round, floor, ceil, min, max, random

Evaluate conditions.

type: 'condition'
params:
left: any # Left operand
operator: string # Comparison operator
right: any # Right operand
returnValue: boolean # Return boolean vs action
# Operators:
# ==, ===, !=, !==, >, >=, <, <=,
# in, not_in, contains, starts_with, ends_with,
# matches (regex), exists, not_exists

Pandas DataFrame operations.

type: "pandas"
params:
data: array|object # Input data
operation: string # Pandas operation
# Operation-specific params
# Operations:
operation: "describe" # Statistical summary
operation: "groupby"
groupBy: "column"
aggregate: "mean|sum|count"
operation: "pivot"
index: "column1"
columns: "column2"
values: "column3"
operation: "merge"
mergeWith: data
on: "column"
how: "inner|outer|left|right"

Numerical computing.

type: "numpy"
params:
data: array # Input array
operation: string # NumPy operation
# Operations:
operation: "mean|std|median" # Statistical functions
operation: "fft" # Fast Fourier Transform
operation: "correlate" # Cross-correlation
operation: "reshape"
shape: [2, 3]

Machine learning operations.

type: "scikit"
params:
data: array # Input data
operation: string # Scikit operation
# Operations:
operation: "scale" # StandardScaler
operation: "normalize" # Normalize data
operation: "pca" # Principal Component Analysis
components: 2
operation: "cluster" # K-means clustering
clusters: 3

Deep learning operations.

type: 'tensorflow'
params:
operation: 'train|predict|evaluate'
modelPath: string # Path to saved model
data: array # Input data
modelConfig: # For training
layers: array
compile: object
epochs: number

# Step results
$stepId # Result of step with id "stepId"
$stepId.field # Access field in result
$stepId.nested.deep.field # Nested field access
$stepId[0] # Array index access
$stepId[0].field # Combine array and field access
# Input parameters
$input # All input parameters
$input.userId # Specific input field
# Variables
$var # All variables
$var.myVariable # Specific variable
# Output mapping
$output # Current outputs
$output.result # Specific output
# Special variables (in loops)
$var.$item # Current item in forEach
$var.$index # Current index in loops
$var.$error # Caught error in catch block
# Environment variables
$env.API_KEY # Environment variable

Variables have different scopes:

  1. Global: Available throughout workflow
  2. Step: Available within a step
  3. Loop: Available within loop iterations
  4. Error: Available in catch blocks
steps:
# Set a variable
- stepType: 'node'
id: 'set_config'
type: 'set'
params:
name: 'apiConfig'
value:
endpoint: 'https://api.example.com'
timeout: 5000
# Use the variable
- stepType: 'node'
id: 'api_call'
type: 'http'
params:
url: '$var.apiConfig.endpoint'
timeout: '$var.apiConfig.timeout'
# Use step result
- stepType: 'node'
id: 'process'
type: 'transform'
params:
input: '$api_call.data'
mapping:
userId: '$.user.id'
userName: '$.user.name'
# Conditional reference
- stepType: 'node'
type: 'log'
params:
message: "${$process.userId ? 'User found' : 'User not found'}"

Steps execute one after another.

steps:
- stepType: 'node'
type: 'step1'
- stepType: 'node'
type: 'step2' # Runs after step1
- stepType: 'node'
type: 'step3' # Runs after step2

Execute multiple operations concurrently.

execution:
type: 'parallel'
maxConcurrency: 10
waitAll: true
continueOnError: false

Automatic retry with backoff.

execution:
type: 'retry'
maxAttempts: 3
backoff: 'exponential' # or "linear"
delay: 1000 # Initial delay in ms

Prevent cascading failures.

execution:
type: 'circuitBreaker'
threshold: 5 # Failures before opening
timeout: 60000 # Reset timeout in ms
halfOpenRequests: 3 # Test requests when half-open

Rate limit execution.

execution:
type: 'throttle'
limit: 10 # Max executions
interval: 1000 # Per interval in ms

Process data in batches.

execution:
type: 'stream'
batchSize: 100
ordered: true # Maintain order

First to complete wins.

execution:
type: 'race'
timeout: 5000 # Max wait time

steps:
- stepType: 'node'
id: 'risky_operation'
type: 'http'
params:
url: 'https://api.example.com'
# Retry configuration
retries: 3
execution:
type: 'retry'
backoff: 'exponential'
delay: 1000
# Error routing
onError: 'error_handler' # Go to specific step
- stepType: 'node'
id: 'error_handler'
type: 'log'
params:
message: 'Operation failed, using fallback'
steps:
- stepType: 'try'
try:
- stepType: 'node'
type: 'risky_operation'
catch:
- stepType: 'node'
type: 'log'
params:
error: '$var.$error'
- stepType: 'node'
type: 'send-alert'
params:
message: 'Workflow failed'
finally:
- stepType: 'node'
type: 'cleanup'
config:
maxRetries: 3
onError:
notify: true
channels: ['email', 'slack']
continueOnError: false

name: "Data Processing Pipeline"
version: "1.0.0"
inputSchema:
type: "object"
properties:
sourceUrl: { type: "string", format: "uri" }
outputFormat: { type: "string", enum: ["json", "csv", "parquet"] }
required: ["sourceUrl", "outputFormat"]
steps:
# Fetch raw data
- stepType: "node"
id: "fetch"
type: "http"
params:
url: "$input.sourceUrl"
outputSchema:
type: "object"
required: ["data"]
# Validate data
- stepType: "node"
id: "validate"
type: "condition"
params:
left: "$fetch.data"
operator: "exists"
onError: "validation_failed"
# Transform data in parallel
- stepType: "parallel"
id: "transform"
steps:
- stepType: "node"
type: "pandas"
params:
data: "$fetch.data"
operation: "clean"
- stepType: "node"
type: "pandas"
params:
data: "$fetch.data"
operation: "normalize"
# Export in requested format
- stepType: "switch"
expression: "$input.outputFormat"
cases:
- value: "json"
steps:
- stepType: "node"
type: "export-json"
- value: "csv"
steps:
- stepType: "node"
type: "export-csv"
- value: "parquet"
steps:
- stepType: "node"
type: "export-parquet"
name: "Event Processor"
version: "1.0.0"
steps:
# Listen for events
- stepType: "node"
id: "listen"
type: "sse"
params:
url: "https://events.example.com/stream"
timeout: 60000
# Process each event
- stepType: "loop"
loopType: "forEach"
items: "$listen.events"
steps:
- stepType: "switch"
expression: "$var.$item.type"
cases:
- value: "user.created"
steps:
- stepType: "workflow"
workflowId: "user-onboarding"
inputs:
user: "$var.$item.data"
- value: "order.placed"
steps:
- stepType: "workflow"
workflowId: "order-processing"
inputs:
order: "$var.$item.data"

Pattern 3: Conditional Parallel Processing

Section titled “Pattern 3: Conditional Parallel Processing”
name: "Smart Parallel Processor"
version: "1.0.0"
steps:
# Determine processing strategy
- stepType: "node"
id: "analyze"
type: "condition"
params:
left: "$input.dataSize"
operator: ">"
right: 1000
# Choose execution strategy
- stepType: "switch"
expression: "$analyze"
cases:
- value: true
steps:
# Large dataset - process in parallel batches
- stepType: "parallel"
strategy:
maxConcurrency: 10
steps:
- stepType: "loop"
loopType: "forEach"
items: "$input.dataBatch1"
steps:
- stepType: "node"
type: "process"
- value: false
steps:
# Small dataset - process sequentially
- stepType: "loop"
loopType: "forEach"
items: "$input.data"
steps:
- stepType: "node"
type: "process"

Pattern 4: Saga Pattern for Distributed Transactions

Section titled “Pattern 4: Saga Pattern for Distributed Transactions”
name: 'Order Saga'
version: '1.0.0'
steps:
# Start transaction
- stepType: 'node'
id: 'start_transaction'
type: 'set'
params:
name: 'transactionId'
value: '${Date.now()}'
# Execute saga steps with compensations
- stepType: 'try'
try:
# Reserve inventory
- stepType: 'node'
id: 'reserve_inventory'
type: 'http'
params:
url: 'https://inventory.api/reserve'
method: 'POST'
body:
items: '$input.items'
transactionId: '$var.transactionId'
# Charge payment
- stepType: 'node'
id: 'charge_payment'
type: 'http'
params:
url: 'https://payment.api/charge'
method: 'POST'
body:
amount: '$input.total'
transactionId: '$var.transactionId'
# Create shipment
- stepType: 'node'
id: 'create_shipment'
type: 'http'
params:
url: 'https://shipping.api/create'
method: 'POST'
body:
order: '$input.orderId'
transactionId: '$var.transactionId'
catch:
# Compensate in reverse order
- stepType: 'parallel'
continueOnError: true
steps:
- stepType: 'node'
type: 'http'
params:
url: 'https://shipping.api/cancel'
method: 'POST'
body:
transactionId: '$var.transactionId'
- stepType: 'node'
type: 'http'
params:
url: 'https://payment.api/refund'
method: 'POST'
body:
transactionId: '$var.transactionId'
- stepType: 'node'
type: 'http'
params:
url: 'https://inventory.api/release'
method: 'POST'
body:
transactionId: '$var.transactionId'

DO:

  • Keep workflows focused on a single responsibility
  • Use descriptive names for steps and variables
  • Add comments using the description field
  • Version your workflows semantically
  • Use node definitions for reusable components

DON’T:

  • Create workflows with more than 50 steps
  • Nest loops more than 3 levels deep
  • Hard-code sensitive data (use environment variables)
  • Ignore error handling
# Good: Parallel independent operations
steps:
- stepType: "parallel"
steps:
- stepType: "node"
type: "fetch-users"
- stepType: "node"
type: "fetch-orders"
- stepType: "node"
type: "fetch-products"
# Bad: Sequential when could be parallel
steps:
- stepType: "node"
type: "fetch-users"
- stepType: "node"
type: "fetch-orders"
- stepType: "node"
type: "fetch-products"
# Good: Comprehensive error handling
steps:
- stepType: "try"
try:
- stepType: "node"
type: "critical-operation"
retries: 3
execution:
type: "retry"
backoff: "exponential"
catch:
- stepType: "node"
type: "log-error"
- stepType: "node"
type: "send-alert"
- stepType: "node"
type: "use-fallback"
finally:
- stepType: "node"
type: "cleanup"
# Bad: No error handling
steps:
- stepType: "node"
type: "critical-operation"
# Good: Configure timeouts and limits
config:
timeout: 300000 # 5 minute global timeout
checkpointInterval: 10000 # Checkpoint every 10 seconds
steps:
- stepType: "node"
type: "http"
timeout: 5000 # 5 second timeout
params:
url: "https://api.example.com"
# Bad: No resource limits
steps:
- stepType: "node"
type: "http"
params:
url: "https://slow-api.example.com"
# Use conditional execution for test mode
steps:
- stepType: "switch"
expression: "$input.testMode"
cases:
- value: true
steps:
- stepType: "node"
type: "mock-data"
- value: false
steps:
- stepType: "node"
type: "real-data"

name: 'ETL Pipeline'
version: '2.0.0'
description: 'Extract, Transform, Load data pipeline'
params:
sourceDatabase: 'postgresql://source-db'
targetDatabase: 'postgresql://target-db'
batchSize: 1000
steps:
# Extract
- stepType: 'node'
id: 'extract'
type: 'database-query'
params:
connection: '$input.sourceDatabase'
query: 'SELECT * FROM users WHERE updated_at > $1'
params: ['$input.lastSync']
execution:
type: 'stream'
batchSize: '$input.batchSize'
# Transform each batch
- stepType: 'loop'
id: 'transform_batches'
loopType: 'forEach'
items: '$extract.batches'
steps:
# Clean data
- stepType: 'node'
id: 'clean'
type: 'pandas'
params:
data: '$var.$item'
operation: 'clean'
removeNull: true
deduplication: true
# Enrich data
- stepType: 'parallel'
id: 'enrich'
steps:
- stepType: 'node'
type: 'http'
params:
url: 'https://api.example.com/enrich'
method: 'POST'
body: '$clean'
- stepType: 'node'
type: 'geocode'
params:
addresses: '$clean.addresses'
# Validate
- stepType: 'node'
id: 'validate'
type: 'schema-validate'
params:
data: '$enrich'
schema:
type: 'object'
required: ['id', 'email', 'name']
# Load
- stepType: 'node'
id: 'load'
type: 'database-bulk-insert'
params:
connection: '$input.targetDatabase'
table: 'users_transformed'
data: '$transform_batches'
onConflict: 'update'
# Update sync timestamp
- stepType: 'node'
type: 'set'
params:
name: 'lastSync'
value: '${Date.now()}'
outputs:
recordsProcessed: '$load.count'
lastSyncTime: '$var.lastSync'
name: "ML Training Pipeline"
version: "1.0.0"
description: "Complete machine learning training pipeline"
inputSchema:
type: "object"
properties:
datasetUrl: { type: "string" }
modelType: { type: "string", enum: ["classification", "regression"] }
targetColumn: { type: "string" }
required: ["datasetUrl", "modelType", "targetColumn"]
steps:
# Data Loading
- stepType: "node"
id: "load_data"
type: "http"
params:
url: "$input.datasetUrl"
method: "GET"
# Data Preprocessing
- stepType: "sequence"
id: "preprocessing"
name: "Data Preprocessing"
steps:
# Parse CSV
- stepType: "node"
id: "parse"
type: "parse"
params:
format: "csv"
data: "$load_data.data"
# Split features and target
- stepType: "node"
id: "split"
type: "pandas"
params:
data: "$parse"
operation: "split"
targetColumn: "$input.targetColumn"
# Handle missing values
- stepType: "node"
id: "impute"
type: "scikit"
params:
data: "$split.features"
operation: "impute"
strategy: "mean"
# Scale features
- stepType: "node"
id: "scale"
type: "scikit"
params:
data: "$impute"
operation: "scale"
# Feature Engineering
- stepType: "parallel"
id: "feature_engineering"
steps:
# PCA
- stepType: "node"
id: "pca"
type: "scikit"
params:
data: "$scale"
operation: "pca"
components: 10
# Feature selection
- stepType: "node"
id: "select_features"
type: "scikit"
params:
data: "$scale"
operation: "select_features"
method: "mutual_info"
k: 20
# Model Training
- stepType: "switch"
id: "train_model"
expression: "$input.modelType"
cases:
- value: "classification"
steps:
- stepType: "parallel"
id: "train_classifiers"
steps:
- stepType: "node"
id: "random_forest"
type: "scikit"
params:
operation: "train"
algorithm: "RandomForest"
features: "$pca"
target: "$split.target"
- stepType: "node"
id: "gradient_boost"
type: "scikit"
params:
operation: "train"
algorithm: "GradientBoosting"
features: "$pca"
target: "$split.target"
- stepType: "node"
id: "neural_net"
type: "tensorflow"
params:
operation: "train"
modelConfig:
layers:
- { type: "Dense", units: 128, activation: "relu" }
- { type: "Dropout", rate: 0.3 }
- { type: "Dense", units: 64, activation: "relu" }
- { type: "Dense", units: 1, activation: "sigmoid" }
epochs: 50
- value: "regression"
steps:
- stepType: "node"
type: "scikit"
params:
operation: "train"
algorithm: "LinearRegression"
# Model Evaluation
- stepType: "node"
id: "evaluate"
type: "scikit"
params:
operation: "evaluate"
models: "$train_model"
testData: "$scale"
metrics: ["accuracy", "precision", "recall", "f1"]
# Select Best Model
- stepType: "node"
id: "select_best"
type: "condition"
params:
models: "$evaluate.results"
criterion: "accuracy"
operation: "max"
# Save Model
- stepType: "node"
id: "save_model"
type: "model-store"
params:
model: "$select_best.model"
metadata:
accuracy: "$select_best.accuracy"
features: "$select_features.selected"
preprocessing: "$scale.scaler"
outputs:
modelId: "$save_model.modelId"
accuracy: "$select_best.accuracy"
metrics: "$evaluate.results"

Symptoms: Step is skipped without error

Possible Causes:

  1. Condition evaluates to false
  2. Skip parameter is true
  3. Previous step failed

Solution:

steps:
- stepType: 'node'
type: 'debug'
params:
check:
condition: '$myCondition'
skip: '$shouldSkip'
previousResult: '$previousStep'

Symptoms: Error: “Cannot read property of undefined”

Possible Causes:

  1. Typo in reference path
  2. Step hasn’t executed yet
  3. Step failed and has no result

Solution:

# Use optional chaining
params:
value: "${$stepId?.data?.field || 'default'}"
# Or check existence
- stepType: "condition"
params:
left: "$stepId.data"
operator: "exists"

Symptoms: Workflow fails with timeout

Solution:

# Increase timeouts at multiple levels
config:
timeout: 600000 # 10 minutes global
steps:
- stepType: 'node'
timeout: 30000 # 30 seconds for step
type: 'http'
params:
timeout: 10000 # 10 seconds for HTTP

Symptoms: Workflow slow or crashes

Solution:

# Use streaming and batching
steps:
- stepType: 'node'
type: 'data-process'
execution:
type: 'stream'
batchSize: 100
params:
streaming: true
config:
enableTracing: true
enableProfiling: true
steps:
- stepType: 'node'
type: 'log'
params:
message: 'Debug: Current state'
data:
input: '$input'
variables: '$var'
lastResult: '$previousStep'
params:
testMode: true
mockData: true
steps:
- stepType: 'switch'
expression: '$input.testMode'
cases:
- value: true
steps:
- stepType: 'node'
type: 'mock-response'
  1. Check Logs: Review execution logs in the UI or API
  2. Trace Execution: Use execution traces to see step-by-step flow
  3. Validate YAML: Use the validation endpoint or CLI tool
  4. Community: Visit our GitHub discussions
  5. Support: Contact support with execution ID

These words have special meaning and shouldn’t be used as step IDs:

  • input, output, var, env
  • item, index, error
  • true, false, null
  • String: "text"
  • Number: 123, 45.67
  • Boolean: true, false
  • Array: [1, 2, 3]
  • Object: { key: "value" }
  • Null: null

Simple JavaScript expressions are supported:

"${1 + 1}" # Math: 2
"${$var.count > 10}" # Comparison: true/false
"${$input.name || 'Unknown'}" # Default values
"${['a','b'].includes($var.x)}" # Array operations

Access environment variables:

params:
apiKey: '$env.API_KEY'
environment: '$env.NODE_ENV'

Note: Environment variables must be configured on the server.


This completes the End User Guide for YAML Pipeline Authoring. For developer documentation on extending and integrating the engine, see the Developer Guide.