Workflow Engine - Developer Guide
Workflow Engine v3 - Developer Guide
Section titled “Workflow Engine v3 - Developer Guide”Complete Guide for Extending, Implementing, and Integrating the Workflow Engine
Section titled “Complete Guide for Extending, Implementing, and Integrating the Workflow Engine”Table of Contents
Section titled “Table of Contents”- Architecture Overview
- Installation and Setup
- Core Concepts
- Plugin Development
- Custom Node Development
- Storage Adapters
- NestJS Integration
- BullMQ and Distributed Processing
- Python Worker Integration
- Testing
- Monitoring and Observability
- Performance Optimization
- Security
- Deployment
- API Reference
- Migration Guide
- Best Practices
Architecture Overview
Section titled “Architecture Overview”System Architecture
Section titled “System Architecture”┌─────────────────────────────────────────────────────────────┐│ Client Layer ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ REST │ │ UI │ │ CLI │ │ SDK │ ││ │ API │ │ Builder │ │ Tool │ │ Client │ ││ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │└─────────────────────────────────────────────────────────────┘ │┌─────────────────────────────────────────────────────────────┐│ Application Layer ││ ┌──────────────────────────────────────────────────────┐ ││ │ NestJS Application │ ││ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ ││ │ │ Workflow │ │ Auth │ │ API │ │ ││ │ │ Service │ │ Module │ │ Gateway │ │ ││ │ └────────────┘ └────────────┘ └────────────┘ │ ││ └──────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────┘ │┌─────────────────────────────────────────────────────────────┐│ Engine Layer ││ ┌──────────────────────────────────────────────────────┐ ││ │ Workflow Engine Core │ ││ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ ││ │ │ Plugin │ │ Step │ │ State │ │ ││ │ │ System │ │ Executor │ │ Manager │ │ ││ │ └──────────┘ └──────────┘ └──────────┘ │ ││ └──────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────┘ │┌─────────────────────────────────────────────────────────────┐│ Processing Layer ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ BullMQ │ │ Node.js │ │ Python │ │ Custom │ ││ │ Workers │ │ Workers │ │ Workers │ │ Workers │ ││ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │└─────────────────────────────────────────────────────────────┘ │┌─────────────────────────────────────────────────────────────┐│ Storage Layer ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ Redis │ │ PostgreSQL │ │ S3 │ │ MongoDB │ ││ │ (State) │ │ (Workflows) │ │ (Files) │ │ (Logs) │ ││ └──────────┘ └──────────┘ ┌──────────┘ └──────────┘ │└─────────────────────────────────────────────────────────────┘Core Components
Section titled “Core Components”1. Workflow Engine Core
Section titled “1. Workflow Engine Core”class WorkflowEngine { // Plugin management private plugins: Map<string, IPlugin>;
// Node registry private nodeRegistry: Map<string, INode>;
// State management private storage: IStorageAdapter;
// Execution private executor: StepExecutor;
// Event system private eventEmitter: EventEmitter;
// Observability private traces: Map<string, ExecutionTrace[]>;}2. Plugin System
Section titled “2. Plugin System”- Node Plugins: Add custom nodes
- Storage Plugins: Custom state storage
- Observer Plugins: Monitoring and analytics
- Auth Plugins: Authentication/authorization
- Transform Plugins: Data transformation
3. Step Executor
Section titled “3. Step Executor”Handles different step types and execution strategies:
- Sequential, Parallel, Stream
- Switch, Loop, Try/Catch
- Dynamic, Workflow
4. State Manager
Section titled “4. State Manager”Manages workflow state with:
- Checkpointing
- State persistence
- State recovery
- Distributed state
Installation and Setup
Section titled “Installation and Setup”Prerequisites
Section titled “Prerequisites”# Node.js 18+ and npm/yarnnode --version # v18.0.0 or higher
# Redis for state managementredis-server --version
# PostgreSQL for workflow storagepsql --version
# Python 3.8+ for Python workers (optional)python --versionInstallation
Section titled “Installation”1. Install Core Package
Section titled “1. Install Core Package”# Using npmnpm install @your-org/workflow-engine-v3
# Using yarnyarn add @your-org/workflow-engine-v3
# Using pnpmpnpm add @your-org/workflow-engine-v32. Install Dependencies
Section titled “2. Install Dependencies”# Core dependenciesnpm install \ zod \ yaml \ eventemitter3 \ p-queue \ axios \ ws \ eventsource
# NestJS integrationnpm install \ @nestjs/common \ @nestjs/core \ @nestjs/platform-express \ @nestjs/event-emitter \ bullmq \ ioredis
# Development dependenciesnpm install -D \ @types/node \ @types/ws \ typescript \ tsx \ jest \ @types/jest3. Initialize Configuration
Section titled “3. Initialize Configuration”import { WorkflowEngineConfig } from '@your-org/workflow-engine-v3';
export const engineConfig: WorkflowEngineConfig = { // Storage configuration storage: { type: 'redis', connection: { host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), }, },
// Execution configuration execution: { maxConcurrency: 10, defaultTimeout: 300000, // 5 minutes checkpointInterval: 10000, // 10 seconds },
// Monitoring monitoring: { enableTracing: true, enableProfiling: true, metricsExporter: 'prometheus', },
// Security security: { enableAuth: true, jwtSecret: process.env.JWT_SECRET, apiKeyHeader: 'x-api-key', },};Project Structure
Section titled “Project Structure”workflow-app/├── src/│ ├── config/│ │ ├── workflow.config.ts│ │ ├── database.config.ts│ │ └── redis.config.ts│ ├── modules/│ │ ├── workflow/│ │ │ ├── workflow.module.ts│ │ │ ├── workflow.service.ts│ │ │ ├── workflow.controller.ts│ │ │ └── workflow.repository.ts│ │ ├── engine/│ │ │ ├── engine.module.ts│ │ │ ├── engine.service.ts│ │ │ └── engine.factory.ts│ │ └── workers/│ │ ├── workers.module.ts│ │ ├── workflow.processor.ts│ │ └── step.processor.ts│ ├── plugins/│ │ ├── nodes/│ │ │ ├── custom-http.node.ts│ │ │ ├── database.node.ts│ │ │ └── ai.node.ts│ │ ├── storage/│ │ │ └── s3-storage.plugin.ts│ │ └── observers/│ │ └── datadog.plugin.ts│ ├── schemas/│ │ ├── workflow.schema.ts│ │ └── execution.schema.ts│ ├── utils/│ │ ├── validation.ts│ │ └── logger.ts│ └── main.ts├── workers/│ ├── python/│ │ ├── requirements.txt│ │ ├── worker.py│ │ └── processors/│ │ ├── pandas_processor.py│ │ └── ml_processor.py│ └── specialized/│ └── gpu-worker/├── test/│ ├── unit/│ ├── integration/│ └── e2e/├── docker/│ ├── Dockerfile│ ├── docker-compose.yml│ └── docker-compose.dev.yml└── package.jsonCore Concepts
Section titled “Core Concepts”1. Plugin Interface
Section titled “1. Plugin Interface”export interface IPlugin { name: string; version: string;
// Lifecycle hooks init(engine: WorkflowEngine): void; destroy?(): void;
// Optional hooks beforeWorkflow?(state: WorkflowState): void; afterWorkflow?(state: WorkflowState): void; beforeStep?(step: Step): void; afterStep?(step: Step, result: any): void;}2. Node Interface
Section titled “2. Node Interface”export interface INode { id?: string; type: string; version?: string;
// Metadata getMetadata(): NodeMetadata;
// Validation validateInput(input: any, schema?: any): void; validateOutput(output: any, schema?: any): void;
// Execution prepare(context: WorkflowState, params: any): any; execute(prepared: any, context: WorkflowState): any; cleanup?(context: WorkflowState): void;
// Resource management estimateResources?(params: any): ResourceRequirements;}
export interface NodeMetadata { type: string; version: string; displayName: string; description: string; category: string; icon?: string;
// Input/Output definitions inputs: PortDefinition[]; outputs: PortDefinition[];
// Parameter schema paramSchema: any; // JSON Schema or Zod schema
// Capabilities capabilities: { async: boolean; streaming: boolean; batchable: boolean; cacheable: boolean; };}3. Storage Interface
Section titled “3. Storage Interface”export interface IStorageAdapter { // Basic operations get(key: string): Promise<any>; set(key: string, value: any, ttl?: number): Promise<void>; delete(key: string): Promise<void>; exists(key: string): Promise<boolean>;
// Batch operations mget(keys: string[]): Promise<any[]>; mset(items: Record<string, any>): Promise<void>;
// Scanning scan(pattern: string, limit?: number): Promise<string[]>;
// Transactions transaction<T>(fn: (tx: Transaction) => Promise<T>): Promise<T>;
// Pub/Sub (optional) publish?(channel: string, message: any): Promise<void>; subscribe?(channel: string, handler: (message: any) => void): Promise<void>;}4. Execution Context
Section titled “4. Execution Context”export interface ExecutionContext { // Identity executionId: string; workflowId: string; parentExecutionId?: string;
// State state: WorkflowState;
// Services storage: IStorageAdapter; logger: Logger; metrics: MetricsCollector;
// Resources resources: { cpu: number; memory: number; connections: Map<string, any>; };
// Security user?: User; permissions?: Permission[];}Plugin Development
Section titled “Plugin Development”Creating a Node Plugin
Section titled “Creating a Node Plugin”import { INodePlugin, INode, BaseNode, AsyncNode, WorkflowEngine } from '@your-org/workflow-engine-v3';import { WebClient } from '@slack/web-api';
export class SlackPlugin implements INodePlugin { name = 'slack-plugin'; version = '1.0.0'; nodeTypes = ['slack-message', 'slack-channel', 'slack-file'];
private slackClient: WebClient;
init(engine: WorkflowEngine): void { this.slackClient = new WebClient(process.env.SLACK_TOKEN);
// Register event handlers engine.on('workflow:complete', (state) => { if (state.metadata?.notifySlack) { this.notifyCompletion(state); } }); }
destroy(): void { // Cleanup }
createNode(type: string): INode { switch (type) { case 'slack-message': return new SlackMessageNode(this.slackClient); case 'slack-channel': return new SlackChannelNode(this.slackClient); case 'slack-file': return new SlackFileNode(this.slackClient); default: throw new Error(`Unknown node type: ${type}`); } }
private async notifyCompletion(state: WorkflowState): Promise<void> { await this.slackClient.chat.postMessage({ channel: '#workflows', text: `Workflow ${state.workflowId} completed with status: ${state.status}`, }); }}
class SlackMessageNode extends AsyncNode { type = 'slack-message';
constructor(private slack: WebClient) { super(); }
getMetadata(): NodeMetadata { return { type: this.type, version: '1.0.0', displayName: 'Send Slack Message', description: 'Send a message to a Slack channel or user', category: 'Communication', icon: 'slack', inputs: [ { name: 'message', type: 'string', required: true }, { name: 'channel', type: 'string', required: true }, ], outputs: [ { name: 'messageId', type: 'string' }, { name: 'timestamp', type: 'string' }, ], paramSchema: { type: 'object', properties: { channel: { type: 'string', pattern: '^[#@].*' }, message: { type: 'string' }, attachments: { type: 'array' }, threadTs: { type: 'string' }, }, required: ['channel', 'message'], }, capabilities: { async: true, streaming: false, batchable: true, cacheable: false, }, }; }
validateInput(input: any, schema?: any): void { if (!input.channel || !input.message) { throw new Error('Channel and message are required'); }
if (!input.channel.startsWith('#') && !input.channel.startsWith('@')) { throw new Error('Channel must start with # or @'); } }
async executeAsync(prepared: any, context: WorkflowState): Promise<any> { const { channel, message, attachments, threadTs } = prepared;
try { const result = await this.slack.chat.postMessage({ channel, text: this.interpolateMessage(message, context), attachments, thread_ts: threadTs, });
return { success: result.ok, messageId: result.ts, channel: result.channel, timestamp: new Date().toISOString(), }; } catch (error: any) { throw new Error(`Failed to send Slack message: ${error.message}`); } }
private interpolateMessage(message: string, context: WorkflowState): string { return message.replace(/\{\{(\w+)\}\}/g, (match, key) => { return context.variables[key] || context.inputs[key] || match; }); }}
// Register the pluginconst engine = new WorkflowEngine();engine.registerPlugin(new SlackPlugin());Creating a Storage Plugin
Section titled “Creating a Storage Plugin”import { IStoragePlugin, IStorageAdapter } from '@your-org/workflow-engine-v3';import AWS from 'aws-sdk';
export class S3StoragePlugin implements IStoragePlugin { name = 's3-storage'; version = '1.0.0';
private s3: AWS.S3; private bucket: string;
constructor(config: { bucket: string; region?: string; accessKeyId?: string; secretAccessKey?: string }) { this.bucket = config.bucket; this.s3 = new AWS.S3({ region: config.region || 'us-east-1', accessKeyId: config.accessKeyId, secretAccessKey: config.secretAccessKey, }); }
init(engine: WorkflowEngine): void { console.log('S3 Storage Plugin initialized'); }
createStorage(): IStorageAdapter { return new S3StorageAdapter(this.s3, this.bucket); }}
class S3StorageAdapter implements IStorageAdapter { constructor(private s3: AWS.S3, private bucket: string) {}
async get(key: string): Promise<any> { try { const result = await this.s3 .getObject({ Bucket: this.bucket, Key: this.sanitizeKey(key), }) .promise();
return JSON.parse(result.Body?.toString() || 'null'); } catch (error: any) { if (error.code === 'NoSuchKey') { return null; } throw error; } }
async set(key: string, value: any, ttl?: number): Promise<void> { const params: AWS.S3.PutObjectRequest = { Bucket: this.bucket, Key: this.sanitizeKey(key), Body: JSON.stringify(value), ContentType: 'application/json', };
if (ttl) { params.Expires = new Date(Date.now() + ttl * 1000); }
await this.s3.putObject(params).promise(); }
async delete(key: string): Promise<void> { await this.s3 .deleteObject({ Bucket: this.bucket, Key: this.sanitizeKey(key), }) .promise(); }
async exists(key: string): Promise<boolean> { try { await this.s3 .headObject({ Bucket: this.bucket, Key: this.sanitizeKey(key), }) .promise(); return true; } catch (error) { return false; } }
async mget(keys: string[]): Promise<any[]> { const promises = keys.map((key) => this.get(key)); return Promise.all(promises); }
async mset(items: Record<string, any>): Promise<void> { const promises = Object.entries(items).map(([key, value]) => this.set(key, value)); await Promise.all(promises); }
async scan(pattern: string, limit: number = 1000): Promise<string[]> { const prefix = pattern.replace('*', ''); const result = await this.s3 .listObjectsV2({ Bucket: this.bucket, Prefix: prefix, MaxKeys: limit, }) .promise();
return result.Contents?.map((obj) => obj.Key!) || []; }
async transaction<T>(fn: (tx: any) => Promise<T>): Promise<T> { // S3 doesn't support transactions, implement with versioning throw new Error('Transactions not supported in S3 storage'); }
private sanitizeKey(key: string): string { // S3 key restrictions return key.replace(/[^a-zA-Z0-9\-_.!*'()/]/g, '_'); }}
// Usageconst storagePlugin = new S3StoragePlugin({ bucket: 'workflow-states', region: 'us-east-1',});
const engine = new WorkflowEngine({ storage: storagePlugin.createStorage(),});Creating an Observer Plugin
Section titled “Creating an Observer Plugin”import { IObserverPlugin, WorkflowState, ExecutionTrace } from '@your-org/workflow-engine-v3';import { Registry, Counter, Histogram, Gauge } from 'prom-client';
export class PrometheusPlugin implements IObserverPlugin { name = 'prometheus-observer'; version = '1.0.0';
private registry: Registry; private metrics: { workflowsTotal: Counter; workflowDuration: Histogram; stepsTotal: Counter; stepDuration: Histogram; activeWorkflows: Gauge; errors: Counter; };
constructor(registry?: Registry) { this.registry = registry || new Registry();
// Define metrics this.metrics = { workflowsTotal: new Counter({ name: 'workflow_executions_total', help: 'Total number of workflow executions', labelNames: ['workflow_id', 'status'], registers: [this.registry], }),
workflowDuration: new Histogram({ name: 'workflow_duration_seconds', help: 'Workflow execution duration', labelNames: ['workflow_id'], buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300], registers: [this.registry], }),
stepsTotal: new Counter({ name: 'workflow_steps_total', help: 'Total number of steps executed', labelNames: ['step_type', 'status'], registers: [this.registry], }),
stepDuration: new Histogram({ name: 'workflow_step_duration_seconds', help: 'Step execution duration', labelNames: ['step_type'], buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10], registers: [this.registry], }),
activeWorkflows: new Gauge({ name: 'workflow_active_count', help: 'Number of active workflows', registers: [this.registry], }),
errors: new Counter({ name: 'workflow_errors_total', help: 'Total number of workflow errors', labelNames: ['workflow_id', 'step_id', 'error_type'], registers: [this.registry], }), }; }
init(engine: WorkflowEngine): void { // Register metric endpoint engine.on('metrics:collect', () => { return this.registry.metrics(); }); }
onWorkflowStart(state: WorkflowState): void { this.metrics.activeWorkflows.inc(); }
onWorkflowEnd(state: WorkflowState): void { this.metrics.activeWorkflows.dec();
this.metrics.workflowsTotal.inc({ workflow_id: state.workflowId || 'unknown', status: state.status, });
if (state.endTime && state.startTime) { const duration = (state.endTime - state.startTime) / 1000; this.metrics.workflowDuration.observe({ workflow_id: state.workflowId || 'unknown' }, duration); }
// Count errors state.errors.forEach((error) => { this.metrics.errors.inc({ workflow_id: state.workflowId || 'unknown', step_id: error.stepId, error_type: this.classifyError(error.error), }); }); }
onStepStart(trace: ExecutionTrace): void { // Could track concurrent steps }
onStepEnd(trace: ExecutionTrace): void { this.metrics.stepsTotal.inc({ step_type: trace.nodeType, status: trace.status, });
this.metrics.stepDuration.observe({ step_type: trace.nodeType }, trace.duration / 1000); }
getMetrics(): Promise<string> { return this.registry.metrics(); }
private classifyError(error: any): string { if (error.includes('timeout')) return 'timeout'; if (error.includes('network')) return 'network'; if (error.includes('validation')) return 'validation'; if (error.includes('auth')) return 'auth'; return 'unknown'; }}
// Usage in NestJS@Controller('metrics')export class MetricsController { constructor(@Inject('PrometheusPlugin') private prometheus: PrometheusPlugin) {}
@Get() async getMetrics(): Promise<string> { return this.prometheus.getMetrics(); }}Custom Node Development
Section titled “Custom Node Development”Basic Node Implementation
Section titled “Basic Node Implementation”import { BaseNode, AsyncNode, NodeMetadata } from '@your-org/workflow-engine-v3';import nodemailer from 'nodemailer';
export class EmailNode extends AsyncNode { type = 'email-send'; version = '1.0.0';
private transporter: nodemailer.Transporter;
constructor() { super();
// Initialize email transporter this.transporter = nodemailer.createTransport({ host: process.env.SMTP_HOST, port: parseInt(process.env.SMTP_PORT || '587'), secure: false, auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS, }, }); }
getMetadata(): NodeMetadata { return { type: this.type, version: this.version, displayName: 'Send Email', description: 'Send an email via SMTP', category: 'Communication', icon: 'email', inputs: [ { name: 'to', type: 'string', required: true, description: 'Recipient email address', }, { name: 'subject', type: 'string', required: true, }, { name: 'body', type: 'string', required: true, }, { name: 'attachments', type: 'array', required: false, }, ], outputs: [ { name: 'messageId', type: 'string', }, { name: 'accepted', type: 'array', }, ], paramSchema: { type: 'object', properties: { to: { type: 'string', format: 'email', description: 'Recipient email', }, cc: { type: 'string', description: 'CC recipients (comma-separated)', }, bcc: { type: 'string', description: 'BCC recipients (comma-separated)', }, subject: { type: 'string', maxLength: 200, }, body: { type: 'string', }, html: { type: 'boolean', default: false, description: 'Send as HTML email', }, attachments: { type: 'array', items: { type: 'object', properties: { filename: { type: 'string' }, content: { type: 'string' }, path: { type: 'string' }, }, }, }, priority: { type: 'string', enum: ['high', 'normal', 'low'], default: 'normal', }, }, required: ['to', 'subject', 'body'], }, capabilities: { async: true, streaming: false, batchable: true, cacheable: false, }, }; }
validateInput(input: any): void { // Validate email format const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(input.to)) { throw new Error(`Invalid email address: ${input.to}`); }
if (input.cc) { const ccEmails = input.cc.split(',').map((e: string) => e.trim()); for (const email of ccEmails) { if (!emailRegex.test(email)) { throw new Error(`Invalid CC email address: ${email}`); } } }
if (!input.subject || input.subject.trim().length === 0) { throw new Error('Email subject cannot be empty'); }
if (!input.body || input.body.trim().length === 0) { throw new Error('Email body cannot be empty'); } }
async executeAsync(prepared: any, context: WorkflowState): Promise<any> { const { to, cc, bcc, subject, body, html = false, attachments = [], priority = 'normal' } = prepared;
// Prepare email options const mailOptions: any = { to, subject: this.interpolate(subject, context), priority, };
if (cc) mailOptions.cc = cc; if (bcc) mailOptions.bcc = bcc;
if (html) { mailOptions.html = this.interpolate(body, context); } else { mailOptions.text = this.interpolate(body, context); }
if (attachments.length > 0) { mailOptions.attachments = attachments; }
try { const info = await this.transporter.sendMail(mailOptions);
return { success: true, messageId: info.messageId, accepted: info.accepted, rejected: info.rejected, response: info.response, }; } catch (error: any) { throw new Error(`Failed to send email: ${error.message}`); } }
private interpolate(text: string, context: WorkflowState): string { // Replace template variables return text.replace(/\{\{([^}]+)\}\}/g, (match, path) => { const value = this.getValueByPath(context, path.trim()); return value !== undefined ? String(value) : match; }); }
private getValueByPath(obj: any, path: string): any { const parts = path.split('.'); let current = obj;
for (const part of parts) { if (part.startsWith('$')) { // Handle special references switch (part) { case '$input': current = obj.inputs; break; case '$var': current = obj.variables; break; default: current = obj.results[part.slice(1)]; } } else { current = current?.[part]; } }
return current; }
async cleanup(context: WorkflowState): Promise<void> { // Close transporter if needed this.transporter.close(); }
estimateResources(params: any): ResourceRequirements { // Estimate resources based on attachments const attachmentSize = params.attachments?.reduce((sum: number, att: any) => sum + (att.content?.length || 0), 0) || 0;
return { cpu: 0.1, // 10% of a CPU core memory: 50 + Math.ceil(attachmentSize / 1024 / 1024), // 50MB + attachment size network: true, duration: 5000, // Estimated 5 seconds }; }}
// Register the nodeengine.registerNode('email-send', new EmailNode());Advanced Node with State Management
Section titled “Advanced Node with State Management”import { AsyncNode, WorkflowState } from '@your-org/workflow-engine-v3';import { Pool, PoolClient } from 'pg';
export class DatabaseTransactionNode extends AsyncNode { type = 'db-transaction'; private pool: Pool; private transactions: Map<string, PoolClient> = new Map();
constructor() { super(); this.pool = new Pool({ connectionString: process.env.DATABASE_URL, max: 20, }); }
async executeAsync(prepared: any, context: WorkflowState): Promise<any> { const { operation, transactionId, query, params } = prepared;
switch (operation) { case 'begin': return this.beginTransaction(transactionId, context); case 'query': return this.executeQuery(transactionId, query, params); case 'commit': return this.commitTransaction(transactionId); case 'rollback': return this.rollbackTransaction(transactionId); default: throw new Error(`Unknown operation: ${operation}`); } }
private async beginTransaction(transactionId: string, context: WorkflowState): Promise<any> { const client = await this.pool.connect(); await client.query('BEGIN');
this.transactions.set(transactionId, client);
// Store transaction ID in context for cleanup context.variables._activeTransactions = context.variables._activeTransactions || []; context.variables._activeTransactions.push(transactionId);
return { transactionId, status: 'begun' }; }
private async executeQuery(transactionId: string, query: string, params: any[]): Promise<any> { const client = this.transactions.get(transactionId); if (!client) { throw new Error(`Transaction ${transactionId} not found`); }
const result = await client.query(query, params); return { rows: result.rows, rowCount: result.rowCount, }; }
private async commitTransaction(transactionId: string): Promise<any> { const client = this.transactions.get(transactionId); if (!client) { throw new Error(`Transaction ${transactionId} not found`); }
await client.query('COMMIT'); client.release(); this.transactions.delete(transactionId);
return { transactionId, status: 'committed' }; }
private async rollbackTransaction(transactionId: string): Promise<any> { const client = this.transactions.get(transactionId); if (!client) { throw new Error(`Transaction ${transactionId} not found`); }
await client.query('ROLLBACK'); client.release(); this.transactions.delete(transactionId);
return { transactionId, status: 'rolled_back' }; }
async cleanup(context: WorkflowState): Promise<void> { // Rollback any open transactions const activeTransactions = context.variables._activeTransactions || [];
for (const transactionId of activeTransactions) { try { await this.rollbackTransaction(transactionId); } catch (error) { console.error(`Failed to rollback transaction ${transactionId}:`, error); } }
await this.pool.end(); }}Storage Adapters
Section titled “Storage Adapters”Multi-Tier Storage Adapter
Section titled “Multi-Tier Storage Adapter”import { IStorageAdapter } from '@your-org/workflow-engine-v3';import Redis from 'ioredis';import AWS from 'aws-sdk';import { Pool } from 'pg';
export class MultiTierStorageAdapter implements IStorageAdapter { private redis: Redis; private s3: AWS.S3; private pg: Pool;
private readonly HOT_TTL = 3600; // 1 hour in Redis private readonly WARM_TTL = 86400; // 1 day in PostgreSQL // Cold storage in S3 indefinitely
constructor(config: { redis: { host: string; port: number }; s3: { bucket: string; region: string }; pg: { connectionString: string }; }) { this.redis = new Redis(config.redis); this.s3 = new AWS.S3({ region: config.s3.region }); this.pg = new Pool({ connectionString: config.pg.connectionString }); }
async get(key: string): Promise<any> { // Try hot storage (Redis) let value = await this.redis.get(key); if (value) { return JSON.parse(value); }
// Try warm storage (PostgreSQL) const pgResult = await this.pg.query('SELECT value FROM workflow_cache WHERE key = $1 AND expires_at > NOW()', [ key, ]);
if (pgResult.rows.length > 0) { value = pgResult.rows[0].value; // Promote to hot storage await this.redis.set(key, JSON.stringify(value), 'EX', this.HOT_TTL); return value; }
// Try cold storage (S3) try { const s3Result = await this.s3 .getObject({ Bucket: 'workflow-storage', Key: `cold/${key}`, }) .promise();
value = JSON.parse(s3Result.Body?.toString() || 'null');
// Promote to warm and hot storage await this.pg.query( 'INSERT INTO workflow_cache (key, value, expires_at) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = $2, expires_at = $3', [key, value, new Date(Date.now() + this.WARM_TTL * 1000)], ); await this.redis.set(key, JSON.stringify(value), 'EX', this.HOT_TTL);
return value; } catch (error: any) { if (error.code === 'NoSuchKey') { return null; } throw error; } }
async set(key: string, value: any, ttl?: number): Promise<void> { const serialized = JSON.stringify(value);
// Determine storage tier based on TTL if (!ttl || ttl <= this.HOT_TTL) { // Hot storage only await this.redis.set(key, serialized, 'EX', ttl || this.HOT_TTL); } else if (ttl <= this.WARM_TTL) { // Hot + Warm storage await Promise.all([ this.redis.set(key, serialized, 'EX', this.HOT_TTL), this.pg.query( 'INSERT INTO workflow_cache (key, value, expires_at) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = $2, expires_at = $3', [key, value, new Date(Date.now() + ttl * 1000)], ), ]); } else { // All tiers await Promise.all([ this.redis.set(key, serialized, 'EX', this.HOT_TTL), this.pg.query( 'INSERT INTO workflow_cache (key, value, expires_at) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = $2, expires_at = $3', [key, value, new Date(Date.now() + this.WARM_TTL * 1000)], ), this.s3 .putObject({ Bucket: 'workflow-storage', Key: `cold/${key}`, Body: serialized, ContentType: 'application/json', }) .promise(), ]); } }
async delete(key: string): Promise<void> { await Promise.all([ this.redis.del(key), this.pg.query('DELETE FROM workflow_cache WHERE key = $1', [key]), this.s3 .deleteObject({ Bucket: 'workflow-storage', Key: `cold/${key}`, }) .promise() .catch(() => {}), // Ignore if not exists ]); }
async exists(key: string): Promise<boolean> { // Check hot storage first (fastest) if (await this.redis.exists(key)) { return true; }
// Check warm storage const pgResult = await this.pg.query('SELECT 1 FROM workflow_cache WHERE key = $1 AND expires_at > NOW()', [ key, ]);
if (pgResult.rows.length > 0) { return true; }
// Check cold storage try { await this.s3 .headObject({ Bucket: 'workflow-storage', Key: `cold/${key}`, }) .promise(); return true; } catch { return false; } }
// Implement other methods...}NestJS Integration
Section titled “NestJS Integration”Workflow Module
Section titled “Workflow Module”import { Module, Global } from '@nestjs/common';import { BullModule } from '@nestjs/bull';import { EventEmitterModule } from '@nestjs/event-emitter';import { TypeOrmModule } from '@nestjs/typeorm';import { WorkflowService } from './workflow.service';import { WorkflowController } from './workflow.controller';import { WorkflowRepository } from './workflow.repository';import { WorkflowProcessor } from './workflow.processor';import { EngineModule } from '../engine/engine.module';
@Global()@Module({ imports: [ EventEmitterModule.forRoot(), BullModule.registerQueue({ name: 'workflows' }, { name: 'steps' }, { name: 'python-nodes' }), TypeOrmModule.forFeature([WorkflowEntity, ExecutionEntity]), EngineModule, ], controllers: [WorkflowController], providers: [WorkflowService, WorkflowRepository, WorkflowProcessor], exports: [WorkflowService],})export class WorkflowModule {}Workflow Service
Section titled “Workflow Service”import { Injectable, Logger, NotFoundException } from '@nestjs/common';import { InjectQueue } from '@nestjs/bull';import { Queue } from 'bullmq';import { EventEmitter2 } from '@nestjs/event-emitter';import { WorkflowEngine, WorkflowState } from '@your-org/workflow-engine-v3';import { WorkflowRepository } from './workflow.repository';
@Injectable()export class WorkflowService { private readonly logger = new Logger(WorkflowService.name); private engine: WorkflowEngine;
constructor( @InjectQueue('workflows') private workflowQueue: Queue, @InjectQueue('steps') private stepQueue: Queue, private repository: WorkflowRepository, private eventEmitter: EventEmitter2, ) { this.initializeEngine(); }
private initializeEngine(): void { this.engine = new WorkflowEngine({ storage: new RedisStorageAdapter(/* config */), enableTracing: true, enableProfiling: true, });
// Setup event forwarding this.engine.on('workflow:start', (state) => { this.eventEmitter.emit('workflow.started', state); });
this.engine.on('workflow:complete', (state) => { this.eventEmitter.emit('workflow.completed', state); });
this.engine.on('workflow:error', ({ state, error }) => { this.eventEmitter.emit('workflow.failed', { state, error }); });
this.logger.log('Workflow engine initialized'); }
async createWorkflow(dto: CreateWorkflowDto): Promise<WorkflowEntity> { const workflow = await this.repository.create(dto);
// Validate workflow definition try { this.engine.validateWorkflow(workflow.definition); } catch (error: any) { throw new BadRequestException(`Invalid workflow: ${error.message}`); }
return workflow; }
async executeWorkflow(workflowId: string, inputs: Record<string, any> = {}): Promise<ExecutionResponseDto> { const workflow = await this.repository.findById(workflowId);
if (!workflow) { throw new NotFoundException(`Workflow ${workflowId} not found`); }
// Create execution record const execution = await this.repository.createExecution({ workflowId, inputs, status: 'pending', });
// Queue for async execution const job = await this.workflowQueue.add('execute', { workflowId, executionId: execution.id, definition: workflow.definition, inputs, });
// Emit event this.eventEmitter.emit('workflow.queued', { workflowId, executionId: execution.id, jobId: job.id, });
return { executionId: execution.id, status: 'queued', jobId: job.id, }; }
async getExecutionStatus(executionId: string): Promise<WorkflowState | null> { return this.engine.loadState(executionId); }
async getExecutionHistory( workflowId: string, options: PaginationOptions, ): Promise<PaginatedResult<ExecutionEntity>> { return this.repository.findExecutions(workflowId, options); }
async pauseExecution(executionId: string): Promise<void> { await this.engine.pause(executionId);
await this.repository.updateExecution(executionId, { status: 'paused', pausedAt: new Date(), });
this.eventEmitter.emit('workflow.paused', { executionId }); }
async resumeExecution(executionId: string): Promise<void> { const execution = await this.repository.findExecution(executionId);
if (!execution) { throw new NotFoundException(`Execution ${executionId} not found`); }
if (execution.status !== 'paused') { throw new BadRequestException(`Execution ${executionId} is not paused`); }
// Queue resume job await this.workflowQueue.add('resume', { executionId, workflowId: execution.workflowId, });
await this.repository.updateExecution(executionId, { status: 'resuming', resumedAt: new Date(), });
this.eventEmitter.emit('workflow.resumed', { executionId }); }
async cancelExecution(executionId: string): Promise<void> { await this.engine.cancel(executionId);
// Remove pending jobs const jobs = await this.workflowQueue.getJobs(['active', 'waiting', 'delayed']);
for (const job of jobs) { if (job.data.executionId === executionId) { await job.remove(); } }
await this.repository.updateExecution(executionId, { status: 'cancelled', cancelledAt: new Date(), });
this.eventEmitter.emit('workflow.cancelled', { executionId }); }
async retryExecution(executionId: string): Promise<void> { const execution = await this.repository.findExecution(executionId);
if (!execution) { throw new NotFoundException(`Execution ${executionId} not found`); }
if (execution.status !== 'failed') { throw new BadRequestException(`Execution ${executionId} has not failed`); }
// Load last state const state = await this.engine.loadState(executionId);
if (!state) { throw new NotFoundException(`State for execution ${executionId} not found`); }
// Queue retry from last checkpoint await this.workflowQueue.add('retry', { executionId, workflowId: execution.workflowId, fromCheckpoint: state.lastCheckpoint, });
await this.repository.updateExecution(executionId, { status: 'retrying', retriedAt: new Date(), retryCount: (execution.retryCount || 0) + 1, });
this.eventEmitter.emit('workflow.retried', { executionId }); }
async getExecutionTraces(executionId: string): Promise<ExecutionTrace[]> { const traces = this.engine.getTraces(executionId);
// Enrich traces with additional metadata return traces.map((trace) => ({ ...trace, executionId, timestamp: new Date(trace.startTime), })); }
async validateWorkflow(definition: any): Promise<ValidationResult> { try { this.engine.validateWorkflow(definition); return { valid: true }; } catch (error: any) { return { valid: false, errors: [error.message], }; } }}Workflow Processor (BullMQ Worker)
Section titled “Workflow Processor (BullMQ Worker)”import { Processor, Process } from '@nestjs/bull';import { Job } from 'bullmq';import { Logger } from '@nestjs/common';import { WorkflowEngine } from '@your-org/workflow-engine-v3';import { WorkflowRepository } from './workflow.repository';
@Processor('workflows')export class WorkflowProcessor { private readonly logger = new Logger(WorkflowProcessor.name); private engine: WorkflowEngine;
constructor(private repository: WorkflowRepository) { this.initializeEngine(); }
private initializeEngine(): void { this.engine = new WorkflowEngine({ // Configuration }); }
@Process('execute') async handleExecute(job: Job<WorkflowJobData>): Promise<any> { const { workflowId, executionId, definition, inputs } = job.data;
this.logger.log(`Executing workflow ${workflowId} - ${executionId}`);
try { // Update status await this.repository.updateExecution(executionId, { status: 'running', startedAt: new Date(), });
// Update job progress await job.updateProgress(10);
// Execute workflow const result = await this.engine.execute(definition, inputs, executionId);
// Update execution record await this.repository.updateExecution(executionId, { status: 'completed', completedAt: new Date(), outputs: result.outputs, });
await job.updateProgress(100);
this.logger.log(`Workflow ${executionId} completed successfully`);
return result; } catch (error: any) { this.logger.error(`Workflow ${executionId} failed:`, error);
// Update execution record await this.repository.updateExecution(executionId, { status: 'failed', failedAt: new Date(), error: error.message, });
throw error; } }
@Process('resume') async handleResume(job: Job<ResumeJobData>): Promise<any> { const { executionId, workflowId } = job.data;
this.logger.log(`Resuming workflow ${executionId}`);
try { const result = await this.engine.resume(executionId);
await this.repository.updateExecution(executionId, { status: 'completed', completedAt: new Date(), outputs: result.outputs, });
return result; } catch (error: any) { this.logger.error(`Failed to resume workflow ${executionId}:`, error);
await this.repository.updateExecution(executionId, { status: 'failed', error: error.message, });
throw error; } }
@Process('retry') async handleRetry(job: Job<RetryJobData>): Promise<any> { const { executionId, fromCheckpoint } = job.data;
this.logger.log(`Retrying workflow ${executionId} from checkpoint ${fromCheckpoint}`);
try { // Load checkpoint state const state = await this.engine.loadCheckpoint(executionId, fromCheckpoint);
if (!state) { throw new Error(`Checkpoint ${fromCheckpoint} not found`); }
// Resume from checkpoint const result = await this.engine.execute(state.workflowId!, state.inputs, executionId);
await this.repository.updateExecution(executionId, { status: 'completed', completedAt: new Date(), outputs: result.outputs, });
return result; } catch (error: any) { this.logger.error(`Failed to retry workflow ${executionId}:`, error);
await this.repository.updateExecution(executionId, { status: 'failed', error: error.message, });
throw error; } }}BullMQ and Distributed Processing
Section titled “BullMQ and Distributed Processing”Queue Configuration
Section titled “Queue Configuration”import { BullModuleOptions } from '@nestjs/bull';import { Redis } from 'ioredis';
export const queueConfig: BullModuleOptions = { redis: { host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), password: process.env.REDIS_PASSWORD, db: parseInt(process.env.REDIS_DB || '0'), }, defaultJobOptions: { removeOnComplete: 100, // Keep last 100 completed jobs removeOnFail: 500, // Keep last 500 failed jobs attempts: 3, backoff: { type: 'exponential', delay: 2000, }, },};
// Queue definitionsexport const QUEUES = { WORKFLOWS: 'workflows', STEPS: 'steps', PYTHON_NODES: 'python-nodes', NOTIFICATIONS: 'notifications', CLEANUP: 'cleanup',} as const;
// Job prioritiesexport const PRIORITIES = { CRITICAL: 1, HIGH: 5, NORMAL: 10, LOW: 20,} as const;Distributed Step Processor
Section titled “Distributed Step Processor”import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';import { Job } from 'bullmq';import { Logger } from '@nestjs/common';
@Processor('steps')export class StepProcessor { private readonly logger = new Logger(StepProcessor.name);
// Track active jobs for resource management private activeJobs = new Map<string, Job>();
@OnQueueActive() onActive(job: Job): void { this.activeJobs.set(job.id!, job); this.logger.debug(`Processing step job ${job.id} - ${job.name}`); }
@OnQueueCompleted() onCompleted(job: Job, result: any): void { this.activeJobs.delete(job.id!); this.logger.debug(`Step job ${job.id} completed`); }
@OnQueueFailed() onFailed(job: Job, error: Error): void { this.activeJobs.delete(job.id!); this.logger.error(`Step job ${job.id} failed:`, error); }
@Process('data-transform') async processDataTransform(job: Job<DataTransformJobData>): Promise<any> { const { data, operations } = job.data;
let result = data;
for (const [index, operation] of operations.entries()) { await job.updateProgress((index / operations.length) * 100);
result = await this.applyOperation(result, operation);
// Check if job should be cancelled if (job.opts.repeat && (await this.shouldCancel(job))) { throw new Error('Job cancelled'); } }
return result; }
@Process('batch-process') async processBatch(job: Job<BatchJobData>): Promise<any> { const { items, processor } = job.data; const results = [];
// Process in chunks const chunkSize = 10; const chunks = this.chunk(items, chunkSize);
for (const [index, chunk] of chunks.entries()) { await job.updateProgress((index / chunks.length) * 100);
const chunkResults = await Promise.all(chunk.map((item) => this.processItem(item, processor)));
results.push(...chunkResults); }
return results; }
private async applyOperation(data: any, operation: any): Promise<any> { switch (operation.type) { case 'filter': return data.filter(operation.predicate); case 'map': return data.map(operation.mapper); case 'reduce': return data.reduce(operation.reducer, operation.initialValue); case 'sort': return data.sort(operation.comparator); default: throw new Error(`Unknown operation: ${operation.type}`); } }
private chunk<T>(array: T[], size: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < array.length; i += size) { chunks.push(array.slice(i, i + size)); } return chunks; }
private async shouldCancel(job: Job): Promise<boolean> { // Check if workflow was cancelled const state = await this.getWorkflowState(job.data.executionId); return state?.status === 'cancelled'; }
private async processItem(item: any, processor: string): Promise<any> { // Process individual item return item; }
private async getWorkflowState(executionId: string): Promise<any> { // Get workflow state from storage return null; }}Worker Pool Management
Section titled “Worker Pool Management”import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';import { Worker, Queue } from 'bullmq';import { Logger } from '@nestjs/common';
@Injectable()export class WorkerPoolService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(WorkerPoolService.name); private workers: Worker[] = []; private queues: Map<string, Queue> = new Map();
async onModuleInit(): Promise<void> { await this.initializeWorkerPool(); }
async onModuleDestroy(): Promise<void> { await this.shutdownWorkerPool(); }
private async initializeWorkerPool(): Promise<void> { const workerConfig = { connection: { host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), }, concurrency: parseInt(process.env.WORKER_CONCURRENCY || '5'), };
// Create workers for different queues const queues = ['workflows', 'steps', 'python-nodes'];
for (const queueName of queues) { const queue = new Queue(queueName, { connection: workerConfig.connection }); this.queues.set(queueName, queue);
// Create multiple workers per queue const workerCount = this.getWorkerCount(queueName);
for (let i = 0; i < workerCount; i++) { const worker = new Worker(queueName, this.getProcessor(queueName), workerConfig);
worker.on('completed', (job) => { this.logger.debug(`Job ${job.id} completed in queue ${queueName}`); });
worker.on('failed', (job, error) => { this.logger.error(`Job ${job?.id} failed in queue ${queueName}:`, error); });
this.workers.push(worker); }
this.logger.log(`Initialized ${workerCount} workers for queue ${queueName}`); } }
private getWorkerCount(queueName: string): number { const counts: Record<string, number> = { workflows: parseInt(process.env.WORKFLOW_WORKERS || '2'), steps: parseInt(process.env.STEP_WORKERS || '5'), 'python-nodes': parseInt(process.env.PYTHON_WORKERS || '3'), };
return counts[queueName] || 1; }
private getProcessor(queueName: string): any { // Return the appropriate processor function for each queue const processors: Record<string, any> = { workflows: this.processWorkflow.bind(this), steps: this.processStep.bind(this), 'python-nodes': this.processPythonNode.bind(this), };
return processors[queueName]; }
private async processWorkflow(job: Job): Promise<any> { // Workflow processing logic return null; }
private async processStep(job: Job): Promise<any> { // Step processing logic return null; }
private async processPythonNode(job: Job): Promise<any> { // Python node processing logic return null; }
private async shutdownWorkerPool(): Promise<void> { this.logger.log('Shutting down worker pool...');
// Close all workers gracefully await Promise.all(this.workers.map((worker) => worker.close()));
// Close all queues await Promise.all(Array.from(this.queues.values()).map((queue) => queue.close()));
this.logger.log('Worker pool shutdown complete'); }
// Dynamic scaling async scaleWorkers(queueName: string, count: number): Promise<void> { const currentWorkers = this.workers.filter((w) => w.name === queueName); const currentCount = currentWorkers.length;
if (count > currentCount) { // Add workers for (let i = 0; i < count - currentCount; i++) { const worker = new Worker(queueName, this.getProcessor(queueName), { /* config */ }); this.workers.push(worker); }
this.logger.log(`Scaled up ${queueName} to ${count} workers`); } else if (count < currentCount) { // Remove workers const workersToRemove = currentWorkers.slice(count);
for (const worker of workersToRemove) { await worker.close(); const index = this.workers.indexOf(worker); this.workers.splice(index, 1); }
this.logger.log(`Scaled down ${queueName} to ${count} workers`); } }
// Metrics async getQueueMetrics(): Promise<QueueMetrics[]> { const metrics: QueueMetrics[] = [];
for (const [name, queue] of this.queues) { const counts = await queue.getJobCounts(); const workers = this.workers.filter((w) => w.name === name);
metrics.push({ name, counts, workerCount: workers.length, isPaused: await queue.isPaused(), }); }
return metrics; }}Python Worker Integration
Section titled “Python Worker Integration”Python Worker Implementation
Section titled “Python Worker Implementation”"""Python Worker for Workflow Engine v3Handles data-intensive operations using Python libraries"""
import asyncioimport jsonimport loggingimport osimport sysfrom typing import Any, Dict, Listfrom dataclasses import dataclass
import redisfrom bullmq import Worker, Jobimport pandas as pdimport numpy as npfrom sklearn import preprocessing, model_selection, ensembleimport tensorflow as tfimport torch
# Configure logginglogging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)
# Configuration@dataclassclass WorkerConfig: redis_host: str = os.getenv('REDIS_HOST', 'localhost') redis_port: int = int(os.getenv('REDIS_PORT', 6379)) redis_password: str = os.getenv('REDIS_PASSWORD', '') queue_name: str = 'python-nodes' concurrency: int = int(os.getenv('PYTHON_WORKER_CONCURRENCY', 4)) max_jobs_per_worker: int = int(os.getenv('MAX_JOBS_PER_WORKER', 10))
config = WorkerConfig()
# Redis connectionredis_client = redis.Redis( host=config.redis_host, port=config.redis_port, password=config.redis_password, decode_responses=True)
class NodeProcessor: """Base class for node processors"""
def __init__(self): self.cache = {}
async def process(self, node_type: str, params: Dict, context: Dict) -> Any: """Route to specific processor"""
processors = { 'pandas': self.process_pandas, 'numpy': self.process_numpy, 'scikit': self.process_scikit, 'tensorflow': self.process_tensorflow, 'torch': self.process_torch, 'custom': self.process_custom, }
processor = processors.get(node_type) if not processor: raise ValueError(f"Unknown node type: {node_type}")
return await processor(params, context)
async def process_pandas(self, params: Dict, context: Dict) -> Any: """Pandas operations"""
operation = params.get('operation') data = params.get('data')
# Convert to DataFrame if isinstance(data, list): df = pd.DataFrame(data) elif isinstance(data, dict): df = pd.DataFrame.from_dict(data) else: df = data
operations = { 'describe': lambda: df.describe().to_dict(), 'info': lambda: self._df_info(df), 'head': lambda: df.head(params.get('n', 5)).to_dict(), 'tail': lambda: df.tail(params.get('n', 5)).to_dict(),
'groupby': lambda: self._groupby(df, params), 'aggregate': lambda: self._aggregate(df, params), 'pivot': lambda: self._pivot(df, params), 'melt': lambda: self._melt(df, params),
'merge': lambda: self._merge(df, params), 'concat': lambda: self._concat(df, params), 'join': lambda: self._join(df, params),
'fillna': lambda: df.fillna(params.get('value', 0)).to_dict(), 'dropna': lambda: df.dropna().to_dict(), 'drop_duplicates': lambda: df.drop_duplicates().to_dict(),
'sort_values': lambda: df.sort_values( by=params.get('by'), ascending=params.get('ascending', True) ).to_dict(),
'query': lambda: df.query(params.get('expression')).to_dict(), 'filter': lambda: self._filter(df, params),
'to_csv': lambda: df.to_csv(index=False), 'to_json': lambda: df.to_json(), 'to_parquet': lambda: self._to_parquet(df, params), }
handler = operations.get(operation) if not handler: raise ValueError(f"Unknown pandas operation: {operation}")
return handler()
def _groupby(self, df: pd.DataFrame, params: Dict) -> Dict: """Group by operation"""
group_by = params.get('groupBy') agg_col = params.get('aggregateColumn') agg_func = params.get('aggregateFunction', 'mean')
grouped = df.groupby(group_by)[agg_col]
agg_functions = { 'mean': grouped.mean, 'sum': grouped.sum, 'count': grouped.count, 'min': grouped.min, 'max': grouped.max, 'std': grouped.std, 'var': grouped.var, 'median': grouped.median, }
result = agg_functions[agg_func]() return result.to_dict()
def _pivot(self, df: pd.DataFrame, params: Dict) -> Dict: """Pivot table operation"""
return df.pivot_table( index=params.get('index'), columns=params.get('columns'), values=params.get('values'), aggfunc=params.get('aggfunc', 'mean'), fill_value=params.get('fillValue', 0) ).to_dict()
async def process_numpy(self, params: Dict, context: Dict) -> Any: """NumPy operations"""
operation = params.get('operation') data = np.array(params.get('data', []))
operations = { # Statistical 'mean': lambda: float(np.mean(data)), 'median': lambda: float(np.median(data)), 'std': lambda: float(np.std(data)), 'var': lambda: float(np.var(data)), 'min': lambda: float(np.min(data)), 'max': lambda: float(np.max(data)), 'sum': lambda: float(np.sum(data)), 'prod': lambda: float(np.prod(data)),
# Linear algebra 'dot': lambda: np.dot(data, np.array(params.get('other'))).tolist(), 'matmul': lambda: np.matmul(data, np.array(params.get('other'))).tolist(), 'transpose': lambda: data.T.tolist(), 'inverse': lambda: np.linalg.inv(data).tolist(), 'determinant': lambda: float(np.linalg.det(data)), 'eigenvalues': lambda: np.linalg.eigvals(data).tolist(),
# Array operations 'reshape': lambda: data.reshape(params.get('shape')).tolist(), 'flatten': lambda: data.flatten().tolist(), 'unique': lambda: np.unique(data).tolist(), 'sort': lambda: np.sort(data).tolist(), 'argsort': lambda: np.argsort(data).tolist(),
# Mathematical 'fft': lambda: np.fft.fft(data).tolist(), 'ifft': lambda: np.fft.ifft(data).tolist(), 'convolve': lambda: np.convolve( data, np.array(params.get('kernel')), mode=params.get('mode', 'valid') ).tolist(), 'correlate': lambda: np.correlate( data, np.array(params.get('other', data)), mode=params.get('mode', 'valid') ).tolist(), }
handler = operations.get(operation) if not handler: raise ValueError(f"Unknown numpy operation: {operation}")
return handler()
async def process_scikit(self, params: Dict, context: Dict) -> Any: """Scikit-learn operations"""
operation = params.get('operation') data = params.get('data')
if operation == 'train': return await self._train_model(params, context) elif operation == 'predict': return await self._predict(params, context) elif operation == 'evaluate': return await self._evaluate(params, context) elif operation == 'preprocess': return await self._preprocess(data, params) else: raise ValueError(f"Unknown scikit operation: {operation}")
async def _train_model(self, params: Dict, context: Dict) -> Dict: """Train a scikit-learn model"""
algorithm = params.get('algorithm', 'RandomForest') X = np.array(params.get('features')) y = np.array(params.get('target')) test_size = params.get('testSize', 0.2)
# Split data X_train, X_test, y_train, y_test = model_selection.train_test_split( X, y, test_size=test_size, random_state=42 )
# Select model models = { 'RandomForest': ensemble.RandomForestClassifier( **params.get('modelParams', {}) ), 'GradientBoosting': ensemble.GradientBoostingClassifier( **params.get('modelParams', {}) ), 'RandomForestRegressor': ensemble.RandomForestRegressor( **params.get('modelParams', {}) ), 'GradientBoostingRegressor': ensemble.GradientBoostingRegressor( **params.get('modelParams', {}) ), }
model = models.get(algorithm) if not model: raise ValueError(f"Unknown algorithm: {algorithm}")
# Train model model.fit(X_train, y_train)
# Evaluate train_score = model.score(X_train, y_train) test_score = model.score(X_test, y_test)
# Store model in cache model_id = f"{context.get('executionId')}_model_{algorithm}" self.cache[model_id] = model
# Also persist to disk if needed import joblib model_path = f"/tmp/{model_id}.pkl" joblib.dump(model, model_path)
return { 'modelId': model_id, 'modelPath': model_path, 'algorithm': algorithm, 'trainScore': float(train_score), 'testScore': float(test_score), 'feature_importances': model.feature_importances_.tolist() if hasattr(model, 'feature_importances_') else None, }
async def _preprocess(self, data: Any, params: Dict) -> Any: """Preprocessing operations"""
operation = params.get('preprocessOperation', 'scale')
if operation == 'scale': scaler = preprocessing.StandardScaler() scaled = scaler.fit_transform(data) return scaled.tolist()
elif operation == 'normalize': normalized = preprocessing.normalize(data, norm=params.get('norm', 'l2')) return normalized.tolist()
elif operation == 'encode': encoder = preprocessing.LabelEncoder() encoded = encoder.fit_transform(data) return encoded.tolist()
elif operation == 'onehot': encoder = preprocessing.OneHotEncoder(sparse=False) encoded = encoder.fit_transform(np.array(data).reshape(-1, 1)) return encoded.tolist()
else: raise ValueError(f"Unknown preprocessing operation: {operation}")
async def process_tensorflow(self, params: Dict, context: Dict) -> Any: """TensorFlow operations"""
operation = params.get('operation')
if operation == 'train': return await self._train_tf_model(params, context) elif operation == 'predict': return await self._predict_tf(params, context) elif operation == 'evaluate': return await self._evaluate_tf(params, context) else: raise ValueError(f"Unknown TensorFlow operation: {operation}")
async def _train_tf_model(self, params: Dict, context: Dict) -> Dict: """Train a TensorFlow model"""
model_config = params.get('modelConfig', {}) X = np.array(params.get('features')) y = np.array(params.get('target'))
# Build model model = tf.keras.Sequential()
for layer_config in model_config.get('layers', []): layer_type = layer_config.get('type')
if layer_type == 'Dense': model.add(tf.keras.layers.Dense( units=layer_config.get('units'), activation=layer_config.get('activation') )) elif layer_type == 'Dropout': model.add(tf.keras.layers.Dropout( rate=layer_config.get('rate') )) elif layer_type == 'Conv2D': model.add(tf.keras.layers.Conv2D( filters=layer_config.get('filters'), kernel_size=layer_config.get('kernel_size'), activation=layer_config.get('activation') )) # Add more layer types as needed
# Compile model compile_config = model_config.get('compile', {}) model.compile( optimizer=compile_config.get('optimizer', 'adam'), loss=compile_config.get('loss', 'mse'), metrics=compile_config.get('metrics', ['accuracy']) )
# Train model history = model.fit( X, y, epochs=model_config.get('epochs', 10), batch_size=model_config.get('batchSize', 32), validation_split=model_config.get('validationSplit', 0.2), verbose=0 )
# Save model model_id = f"{context.get('executionId')}_tfmodel" model_path = f"/tmp/{model_id}" model.save(model_path)
return { 'modelId': model_id, 'modelPath': model_path, 'history': { 'loss': history.history['loss'], 'val_loss': history.history.get('val_loss', []), 'metrics': {k: v for k, v in history.history.items() if k not in ['loss', 'val_loss']} } }
async def process_torch(self, params: Dict, context: Dict) -> Any: """PyTorch operations"""
operation = params.get('operation')
# Similar to TensorFlow but using PyTorch # Implementation depends on specific requirements
return {'message': 'PyTorch operation completed'}
async def process_custom(self, params: Dict, context: Dict) -> Any: """Execute custom Python code"""
code = params.get('code') if not code: raise ValueError("No code provided for custom execution")
# Create safe execution environment safe_globals = { 'pd': pd, 'np': np, 'params': params, 'context': context, }
safe_locals = {}
# Execute code exec(code, safe_globals, safe_locals)
# Return result if specified return safe_locals.get('result', None)
async def process_job(job: Job) -> Any: """Main job processor"""
logger.info(f"Processing job {job.id}: {job.name}")
try: # Extract job data node_type = job.data.get('nodeType') params = job.data.get('params', {}) context = job.data.get('context', {})
# Create processor processor = NodeProcessor()
# Process node result = await processor.process(node_type, params, context)
# Store result in Redis if large if sys.getsizeof(result) > 1024 * 1024: # 1MB result_key = f"result:{job.id}" redis_client.set(result_key, json.dumps(result), ex=3600) return {'resultKey': result_key, 'size': sys.getsizeof(result)}
logger.info(f"Job {job.id} completed successfully") return result
except Exception as e: logger.error(f"Job {job.id} failed: {e}") raise
async def health_check(): """Health check endpoint"""
try: # Check Redis connection redis_client.ping()
# Check library imports import pandas import numpy import sklearn import tensorflow
return { 'status': 'healthy', 'redis': 'connected', 'libraries': { 'pandas': pandas.__version__, 'numpy': numpy.__version__, 'sklearn': sklearn.__version__, 'tensorflow': tensorflow.__version__, } } except Exception as e: return { 'status': 'unhealthy', 'error': str(e) }
async def main(): """Main worker loop"""
logger.info("Starting Python worker...")
# Create worker worker = Worker( config.queue_name, process_job, { 'connection': { 'host': config.redis_host, 'port': config.redis_port, 'password': config.redis_password, }, 'concurrency': config.concurrency, 'maxJobsPerWorker': config.max_jobs_per_worker, } )
# Setup signal handlers import signal
def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down...") worker.close() sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler)
# Run worker logger.info(f"Python worker started on queue '{config.queue_name}'")
try: await worker.run() except Exception as e: logger.error(f"Worker error: {e}") raise
if __name__ == '__main__': asyncio.run(main())Python Worker Dockerfile
Section titled “Python Worker Dockerfile”FROM python:3.11-slim
WORKDIR /app
# Install system dependenciesRUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/*
# Install Python dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
# Copy worker codeCOPY workers/python/ .
# Health checkHEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD python -c "import asyncio; import worker; print(asyncio.run(worker.health_check()))"
# Run workerCMD ["python", "worker.py"]Testing
Section titled “Testing”Unit Testing Nodes
Section titled “Unit Testing Nodes”import { EmailNode } from '../../src/nodes/email-node';import { WorkflowState } from '@your-org/workflow-engine-v3';
describe('EmailNode', () => { let node: EmailNode; let mockState: WorkflowState;
beforeEach(() => { node = new EmailNode(); mockState = { executionId: 'test-exec-1', status: 'running', inputs: { userId: '123' }, variables: { userName: 'John Doe' }, results: {}, // ... other required fields } as WorkflowState; });
describe('validateInput', () => { it('should validate email format', () => { expect(() => { node.validateInput({ to: 'invalid-email', subject: 'Test', body: 'Test body', }); }).toThrow('Invalid email address');
expect(() => { node.validateInput({ subject: 'Test', body: 'Test body', }); }).not.toThrow(); });
it('should validate required fields', () => { expect(() => { node.validateInput({ subject: '', body: 'Body', }); }).toThrow('Email subject cannot be empty'); }); });
describe('executeAsync', () => { it('should send email successfully', async () => { const params = { subject: 'Hello {{userName}}', body: 'Welcome, {{userName}}!', };
const result = await node.executeAsync(node.prepare(mockState, params), mockState);
expect(result.success).toBe(true); expect(result.messageId).toBeDefined(); });
it('should interpolate variables', async () => { const params = { subject: 'Hello {{var.userName}}', body: 'User ID: {{input.userId}}', };
const prepared = node.prepare(mockState, params);
expect(prepared.subject).toContain('John Doe'); expect(prepared.body).toContain('123'); }); });});Integration Testing
Section titled “Integration Testing”import { Test, TestingModule } from '@nestjs/testing';import { WorkflowService } from '../../src/modules/workflow/workflow.service';import { WorkflowEngine } from '@your-org/workflow-engine-v3';
describe('WorkflowService Integration', () => { let service: WorkflowService; let engine: WorkflowEngine;
beforeAll(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [WorkflowService], }).compile();
service = module.get<WorkflowService>(WorkflowService); engine = service['engine']; });
it('should execute a simple workflow', async () => { const workflow = { name: 'Test Workflow', version: '1.0.0', steps: [ { stepType: 'node', type: 'log', params: { message: 'Test' }, }, ], };
const execution = await service.executeWorkflow('test-1', {});
// Wait for completion await new Promise((resolve) => setTimeout(resolve, 1000));
const state = await service.getExecutionStatus(execution.executionId);
expect(state?.status).toBe('completed'); });
it('should handle parallel execution', async () => { const workflow = { name: 'Parallel Test', version: '1.0.0', steps: [ { stepType: 'parallel', steps: [ { stepType: 'node', type: 'delay', params: { milliseconds: 100 } }, { stepType: 'node', type: 'delay', params: { milliseconds: 100 } }, { stepType: 'node', type: 'delay', params: { milliseconds: 100 } }, ], }, ], };
const start = Date.now(); const execution = await service.executeWorkflow('test-2', {});
await new Promise((resolve) => setTimeout(resolve, 500));
const duration = Date.now() - start;
// Should complete in ~100ms, not 300ms expect(duration).toBeLessThan(200); });});E2E Testing
Section titled “E2E Testing”import { INestApplication } from '@nestjs/common';import { Test, TestingModule } from '@nestjs/testing';import * as request from 'supertest';import { AppModule } from '../../src/app.module';
describe('WorkflowController E2E', () => { let app: INestApplication;
beforeAll(async () => { const moduleFixture: TestingModule = await Test.createTestingModule({ imports: [AppModule], }).compile();
app = moduleFixture.createNestApplication(); await app.init(); });
afterAll(async () => { await app.close(); });
it('/workflows (POST) - Create workflow', async () => { const workflow = { name: 'E2E Test Workflow', description: 'Test workflow', definition: { version: '1.0.0', steps: [{ stepType: 'node', type: 'log', params: { message: 'E2E Test' } }], }, };
const response = await request(app.getHttpServer()).post('/workflows').send(workflow).expect(201);
expect(response.body).toHaveProperty('id'); expect(response.body.name).toBe(workflow.name); });
it('/workflows/:id/execute (POST) - Execute workflow', async () => { // First create a workflow const createResponse = await request(app.getHttpServer()) .post('/workflows') .send({ name: 'Execute Test', definition: { version: '1.0.0', steps: [{ stepType: 'node', type: 'set', params: { name: 'test', value: 123 } }], }, });
const workflowId = createResponse.body.id;
// Execute the workflow const executeResponse = await request(app.getHttpServer()) .post(`/workflows/${workflowId}/execute`) .send({ input: 'test' }) .expect(200);
expect(executeResponse.body).toHaveProperty('executionId'); expect(executeResponse.body.status).toBe('queued'); });
it('/workflows/executions/:id (GET) - Get execution status', async () => { // Create and execute a workflow const createResponse = await request(app.getHttpServer()) .post('/workflows') .send({ name: 'Status Test', definition: { version: '1.0.0', steps: [{ stepType: 'node', type: 'log', params: { message: 'Status test' } }], }, });
const executeResponse = await request(app.getHttpServer()) .post(`/workflows/${createResponse.body.id}/execute`) .send({});
const executionId = executeResponse.body.executionId;
// Wait a bit for execution await new Promise((resolve) => setTimeout(resolve, 1000));
// Get status const statusResponse = await request(app.getHttpServer()) .get(`/workflows/executions/${executionId}`) .expect(200);
expect(statusResponse.body).toHaveProperty('status'); expect(['running', 'completed']).toContain(statusResponse.body.status); });});Best Practices
Section titled “Best Practices”1. Node Development
Section titled “1. Node Development”// DO: Make nodes focused and reusableexport class SinglePurposeNode extends BaseNode { // One clear responsibility}
// DON'T: Create mega-nodesexport class DoEverythingNode extends BaseNode { // Too many responsibilities}
// DO: Validate inputs properlyvalidateInput(input: any): void { const schema = z.object({ required: z.string(), optional: z.string().optional(), });
schema.parse(input);}
// DO: Handle errors gracefullyasync executeAsync(prepared: any): Promise<any> { try { return await this.doWork(prepared); } catch (error) { // Log for debugging this.logger.error('Node execution failed:', error);
// Return meaningful error throw new NodeExecutionError( `Failed to execute ${this.type}: ${error.message}`, { originalError: error, params: prepared } ); }}2. Storage and State Management
Section titled “2. Storage and State Management”// DO: Use appropriate storage tiersclass OptimizedStorage { async get(key: string): Promise<any> { // Try cache first const cached = await this.cache.get(key); if (cached) return cached;
// Then persistent storage const stored = await this.persistent.get(key); if (stored) { // Update cache await this.cache.set(key, stored, 300); // 5 min cache return stored; }
return null; }}
// DO: Clean up resourcesasync cleanup(): Promise<void> { // Close connections await this.db?.close(); await this.redis?.quit();
// Clear caches this.cache.clear();
// Cancel pending operations this.pendingOps.forEach(op => op.cancel());}3. Performance Optimization
Section titled “3. Performance Optimization”// DO: Use connection poolingconst pool = new Pool({ max: 20, min: 5, idle: 10000,});
// DO: Batch operationsasync processBatch(items: any[]): Promise<any[]> { const BATCH_SIZE = 100; const results = [];
for (let i = 0; i < items.length; i += BATCH_SIZE) { const batch = items.slice(i, i + BATCH_SIZE); const batchResults = await Promise.all( batch.map(item => this.processItem(item)) ); results.push(...batchResults); }
return results;}
// DO: Implement caching@Cacheable({ ttl: 300 })async expensiveOperation(params: any): Promise<any> { // This result will be cached for 5 minutes return await this.compute(params);}4. Security
Section titled “4. Security”// DO: Validate all inputsconst InputSchema = z.object({ userId: z.string().uuid(), email: z.string().email(), role: z.enum(['admin', 'user']),});
// DO: Use parameterized queriesasync query(sql: string, params: any[]): Promise<any> { // Safe from SQL injection return await this.pool.query(sql, params);}
// DON'T: Build queries with string concatenation// const query = `SELECT * FROM users WHERE id = '${userId}'`; // BAD!
// DO: Implement rate limiting@Throttle(10, 60) // 10 requests per minuteasync publicEndpoint(): Promise<any> { // Rate limited endpoint}
// DO: Encrypt sensitive dataclass SecureStorage { async set(key: string, value: any): Promise<void> { const encrypted = await this.encrypt(JSON.stringify(value)); await this.storage.set(key, encrypted); }
async get(key: string): Promise<any> { const encrypted = await this.storage.get(key); if (!encrypted) return null;
const decrypted = await this.decrypt(encrypted); return JSON.parse(decrypted); }}5. Monitoring and Observability
Section titled “5. Monitoring and Observability”// DO: Add comprehensive loggingclass ObservableNode extends BaseNode { async executeAsync(prepared: any, context: WorkflowState): Promise<any> { const startTime = Date.now(); const traceId = context.executionId;
this.logger.info('Node execution started', { nodeType: this.type, traceId, params: this.sanitizeParams(prepared), });
try { const result = await this.doWork(prepared);
this.metrics.recordSuccess(this.type, Date.now() - startTime);
this.logger.info('Node execution completed', { nodeType: this.type, traceId, duration: Date.now() - startTime, });
return result; } catch (error) { this.metrics.recordFailure(this.type, error);
this.logger.error('Node execution failed', { nodeType: this.type, traceId, error: error.message, stack: error.stack, duration: Date.now() - startTime, });
throw error; } }}This completes the comprehensive Developer Guide for the Workflow Engine v3. The guide covers everything from architecture and setup through advanced topics like distributed processing and Python integration, providing developers with all the information needed to extend, implement, and integrate the workflow engine into their applications.