Skip to content

Workflow Engine - Developer Guide

Complete Guide for Extending, Implementing, and Integrating the Workflow Engine

Section titled “Complete Guide for Extending, Implementing, and Integrating the Workflow Engine”

  1. Architecture Overview
  2. Installation and Setup
  3. Core Concepts
  4. Plugin Development
  5. Custom Node Development
  6. Storage Adapters
  7. NestJS Integration
  8. BullMQ and Distributed Processing
  9. Python Worker Integration
  10. Testing
  11. Monitoring and Observability
  12. Performance Optimization
  13. Security
  14. Deployment
  15. API Reference
  16. Migration Guide
  17. Best Practices

┌─────────────────────────────────────────────────────────────┐
│ Client Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ REST │ │ UI │ │ CLI │ │ SDK │ │
│ │ API │ │ Builder │ │ Tool │ │ Client │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ NestJS Application │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Workflow │ │ Auth │ │ API │ │ │
│ │ │ Service │ │ Module │ │ Gateway │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Engine Layer │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Workflow Engine Core │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Plugin │ │ Step │ │ State │ │ │
│ │ │ System │ │ Executor │ │ Manager │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Processing Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ BullMQ │ │ Node.js │ │ Python │ │ Custom │ │
│ │ Workers │ │ Workers │ │ Workers │ │ Workers │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Storage Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Redis │ │ PostgreSQL │ │ S3 │ │ MongoDB │ │
│ │ (State) │ │ (Workflows) │ │ (Files) │ │ (Logs) │ │
│ └──────────┘ └──────────┘ ┌──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
class WorkflowEngine {
// Plugin management
private plugins: Map<string, IPlugin>;
// Node registry
private nodeRegistry: Map<string, INode>;
// State management
private storage: IStorageAdapter;
// Execution
private executor: StepExecutor;
// Event system
private eventEmitter: EventEmitter;
// Observability
private traces: Map<string, ExecutionTrace[]>;
}
  • Node Plugins: Add custom nodes
  • Storage Plugins: Custom state storage
  • Observer Plugins: Monitoring and analytics
  • Auth Plugins: Authentication/authorization
  • Transform Plugins: Data transformation

Handles different step types and execution strategies:

  • Sequential, Parallel, Stream
  • Switch, Loop, Try/Catch
  • Dynamic, Workflow

Manages workflow state with:

  • Checkpointing
  • State persistence
  • State recovery
  • Distributed state

Terminal window
# Node.js 18+ and npm/yarn
node --version # v18.0.0 or higher
# Redis for state management
redis-server --version
# PostgreSQL for workflow storage
psql --version
# Python 3.8+ for Python workers (optional)
python --version
Terminal window
# Using npm
npm install @your-org/workflow-engine-v3
# Using yarn
yarn add @your-org/workflow-engine-v3
# Using pnpm
pnpm add @your-org/workflow-engine-v3
Terminal window
# Core dependencies
npm install \
zod \
yaml \
eventemitter3 \
p-queue \
axios \
ws \
eventsource
# NestJS integration
npm install \
@nestjs/common \
@nestjs/core \
@nestjs/platform-express \
@nestjs/event-emitter \
bullmq \
ioredis
# Development dependencies
npm install -D \
@types/node \
@types/ws \
typescript \
tsx \
jest \
@types/jest
workflow.config.ts
import { WorkflowEngineConfig } from '@your-org/workflow-engine-v3';
export const engineConfig: WorkflowEngineConfig = {
// Storage configuration
storage: {
type: 'redis',
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
},
// Execution configuration
execution: {
maxConcurrency: 10,
defaultTimeout: 300000, // 5 minutes
checkpointInterval: 10000, // 10 seconds
},
// Monitoring
monitoring: {
enableTracing: true,
enableProfiling: true,
metricsExporter: 'prometheus',
},
// Security
security: {
enableAuth: true,
jwtSecret: process.env.JWT_SECRET,
apiKeyHeader: 'x-api-key',
},
};
workflow-app/
├── src/
│ ├── config/
│ │ ├── workflow.config.ts
│ │ ├── database.config.ts
│ │ └── redis.config.ts
│ ├── modules/
│ │ ├── workflow/
│ │ │ ├── workflow.module.ts
│ │ │ ├── workflow.service.ts
│ │ │ ├── workflow.controller.ts
│ │ │ └── workflow.repository.ts
│ │ ├── engine/
│ │ │ ├── engine.module.ts
│ │ │ ├── engine.service.ts
│ │ │ └── engine.factory.ts
│ │ └── workers/
│ │ ├── workers.module.ts
│ │ ├── workflow.processor.ts
│ │ └── step.processor.ts
│ ├── plugins/
│ │ ├── nodes/
│ │ │ ├── custom-http.node.ts
│ │ │ ├── database.node.ts
│ │ │ └── ai.node.ts
│ │ ├── storage/
│ │ │ └── s3-storage.plugin.ts
│ │ └── observers/
│ │ └── datadog.plugin.ts
│ ├── schemas/
│ │ ├── workflow.schema.ts
│ │ └── execution.schema.ts
│ ├── utils/
│ │ ├── validation.ts
│ │ └── logger.ts
│ └── main.ts
├── workers/
│ ├── python/
│ │ ├── requirements.txt
│ │ ├── worker.py
│ │ └── processors/
│ │ ├── pandas_processor.py
│ │ └── ml_processor.py
│ └── specialized/
│ └── gpu-worker/
├── test/
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── docker/
│ ├── Dockerfile
│ ├── docker-compose.yml
│ └── docker-compose.dev.yml
└── package.json

export interface IPlugin {
name: string;
version: string;
// Lifecycle hooks
init(engine: WorkflowEngine): void;
destroy?(): void;
// Optional hooks
beforeWorkflow?(state: WorkflowState): void;
afterWorkflow?(state: WorkflowState): void;
beforeStep?(step: Step): void;
afterStep?(step: Step, result: any): void;
}
export interface INode {
id?: string;
type: string;
version?: string;
// Metadata
getMetadata(): NodeMetadata;
// Validation
validateInput(input: any, schema?: any): void;
validateOutput(output: any, schema?: any): void;
// Execution
prepare(context: WorkflowState, params: any): any;
execute(prepared: any, context: WorkflowState): any;
cleanup?(context: WorkflowState): void;
// Resource management
estimateResources?(params: any): ResourceRequirements;
}
export interface NodeMetadata {
type: string;
version: string;
displayName: string;
description: string;
category: string;
icon?: string;
// Input/Output definitions
inputs: PortDefinition[];
outputs: PortDefinition[];
// Parameter schema
paramSchema: any; // JSON Schema or Zod schema
// Capabilities
capabilities: {
async: boolean;
streaming: boolean;
batchable: boolean;
cacheable: boolean;
};
}
export interface IStorageAdapter {
// Basic operations
get(key: string): Promise<any>;
set(key: string, value: any, ttl?: number): Promise<void>;
delete(key: string): Promise<void>;
exists(key: string): Promise<boolean>;
// Batch operations
mget(keys: string[]): Promise<any[]>;
mset(items: Record<string, any>): Promise<void>;
// Scanning
scan(pattern: string, limit?: number): Promise<string[]>;
// Transactions
transaction<T>(fn: (tx: Transaction) => Promise<T>): Promise<T>;
// Pub/Sub (optional)
publish?(channel: string, message: any): Promise<void>;
subscribe?(channel: string, handler: (message: any) => void): Promise<void>;
}
export interface ExecutionContext {
// Identity
executionId: string;
workflowId: string;
parentExecutionId?: string;
// State
state: WorkflowState;
// Services
storage: IStorageAdapter;
logger: Logger;
metrics: MetricsCollector;
// Resources
resources: {
cpu: number;
memory: number;
connections: Map<string, any>;
};
// Security
user?: User;
permissions?: Permission[];
}

plugins/nodes/slack-plugin.ts
import { INodePlugin, INode, BaseNode, AsyncNode, WorkflowEngine } from '@your-org/workflow-engine-v3';
import { WebClient } from '@slack/web-api';
export class SlackPlugin implements INodePlugin {
name = 'slack-plugin';
version = '1.0.0';
nodeTypes = ['slack-message', 'slack-channel', 'slack-file'];
private slackClient: WebClient;
init(engine: WorkflowEngine): void {
this.slackClient = new WebClient(process.env.SLACK_TOKEN);
// Register event handlers
engine.on('workflow:complete', (state) => {
if (state.metadata?.notifySlack) {
this.notifyCompletion(state);
}
});
}
destroy(): void {
// Cleanup
}
createNode(type: string): INode {
switch (type) {
case 'slack-message':
return new SlackMessageNode(this.slackClient);
case 'slack-channel':
return new SlackChannelNode(this.slackClient);
case 'slack-file':
return new SlackFileNode(this.slackClient);
default:
throw new Error(`Unknown node type: ${type}`);
}
}
private async notifyCompletion(state: WorkflowState): Promise<void> {
await this.slackClient.chat.postMessage({
channel: '#workflows',
text: `Workflow ${state.workflowId} completed with status: ${state.status}`,
});
}
}
class SlackMessageNode extends AsyncNode {
type = 'slack-message';
constructor(private slack: WebClient) {
super();
}
getMetadata(): NodeMetadata {
return {
type: this.type,
version: '1.0.0',
displayName: 'Send Slack Message',
description: 'Send a message to a Slack channel or user',
category: 'Communication',
icon: 'slack',
inputs: [
{ name: 'message', type: 'string', required: true },
{ name: 'channel', type: 'string', required: true },
],
outputs: [
{ name: 'messageId', type: 'string' },
{ name: 'timestamp', type: 'string' },
],
paramSchema: {
type: 'object',
properties: {
channel: { type: 'string', pattern: '^[#@].*' },
message: { type: 'string' },
attachments: { type: 'array' },
threadTs: { type: 'string' },
},
required: ['channel', 'message'],
},
capabilities: {
async: true,
streaming: false,
batchable: true,
cacheable: false,
},
};
}
validateInput(input: any, schema?: any): void {
if (!input.channel || !input.message) {
throw new Error('Channel and message are required');
}
if (!input.channel.startsWith('#') && !input.channel.startsWith('@')) {
throw new Error('Channel must start with # or @');
}
}
async executeAsync(prepared: any, context: WorkflowState): Promise<any> {
const { channel, message, attachments, threadTs } = prepared;
try {
const result = await this.slack.chat.postMessage({
channel,
text: this.interpolateMessage(message, context),
attachments,
thread_ts: threadTs,
});
return {
success: result.ok,
messageId: result.ts,
channel: result.channel,
timestamp: new Date().toISOString(),
};
} catch (error: any) {
throw new Error(`Failed to send Slack message: ${error.message}`);
}
}
private interpolateMessage(message: string, context: WorkflowState): string {
return message.replace(/\{\{(\w+)\}\}/g, (match, key) => {
return context.variables[key] || context.inputs[key] || match;
});
}
}
// Register the plugin
const engine = new WorkflowEngine();
engine.registerPlugin(new SlackPlugin());
plugins/storage/s3-storage-plugin.ts
import { IStoragePlugin, IStorageAdapter } from '@your-org/workflow-engine-v3';
import AWS from 'aws-sdk';
export class S3StoragePlugin implements IStoragePlugin {
name = 's3-storage';
version = '1.0.0';
private s3: AWS.S3;
private bucket: string;
constructor(config: { bucket: string; region?: string; accessKeyId?: string; secretAccessKey?: string }) {
this.bucket = config.bucket;
this.s3 = new AWS.S3({
region: config.region || 'us-east-1',
accessKeyId: config.accessKeyId,
secretAccessKey: config.secretAccessKey,
});
}
init(engine: WorkflowEngine): void {
console.log('S3 Storage Plugin initialized');
}
createStorage(): IStorageAdapter {
return new S3StorageAdapter(this.s3, this.bucket);
}
}
class S3StorageAdapter implements IStorageAdapter {
constructor(private s3: AWS.S3, private bucket: string) {}
async get(key: string): Promise<any> {
try {
const result = await this.s3
.getObject({
Bucket: this.bucket,
Key: this.sanitizeKey(key),
})
.promise();
return JSON.parse(result.Body?.toString() || 'null');
} catch (error: any) {
if (error.code === 'NoSuchKey') {
return null;
}
throw error;
}
}
async set(key: string, value: any, ttl?: number): Promise<void> {
const params: AWS.S3.PutObjectRequest = {
Bucket: this.bucket,
Key: this.sanitizeKey(key),
Body: JSON.stringify(value),
ContentType: 'application/json',
};
if (ttl) {
params.Expires = new Date(Date.now() + ttl * 1000);
}
await this.s3.putObject(params).promise();
}
async delete(key: string): Promise<void> {
await this.s3
.deleteObject({
Bucket: this.bucket,
Key: this.sanitizeKey(key),
})
.promise();
}
async exists(key: string): Promise<boolean> {
try {
await this.s3
.headObject({
Bucket: this.bucket,
Key: this.sanitizeKey(key),
})
.promise();
return true;
} catch (error) {
return false;
}
}
async mget(keys: string[]): Promise<any[]> {
const promises = keys.map((key) => this.get(key));
return Promise.all(promises);
}
async mset(items: Record<string, any>): Promise<void> {
const promises = Object.entries(items).map(([key, value]) => this.set(key, value));
await Promise.all(promises);
}
async scan(pattern: string, limit: number = 1000): Promise<string[]> {
const prefix = pattern.replace('*', '');
const result = await this.s3
.listObjectsV2({
Bucket: this.bucket,
Prefix: prefix,
MaxKeys: limit,
})
.promise();
return result.Contents?.map((obj) => obj.Key!) || [];
}
async transaction<T>(fn: (tx: any) => Promise<T>): Promise<T> {
// S3 doesn't support transactions, implement with versioning
throw new Error('Transactions not supported in S3 storage');
}
private sanitizeKey(key: string): string {
// S3 key restrictions
return key.replace(/[^a-zA-Z0-9\-_.!*'()/]/g, '_');
}
}
// Usage
const storagePlugin = new S3StoragePlugin({
bucket: 'workflow-states',
region: 'us-east-1',
});
const engine = new WorkflowEngine({
storage: storagePlugin.createStorage(),
});
plugins/observers/prometheus-plugin.ts
import { IObserverPlugin, WorkflowState, ExecutionTrace } from '@your-org/workflow-engine-v3';
import { Registry, Counter, Histogram, Gauge } from 'prom-client';
export class PrometheusPlugin implements IObserverPlugin {
name = 'prometheus-observer';
version = '1.0.0';
private registry: Registry;
private metrics: {
workflowsTotal: Counter;
workflowDuration: Histogram;
stepsTotal: Counter;
stepDuration: Histogram;
activeWorkflows: Gauge;
errors: Counter;
};
constructor(registry?: Registry) {
this.registry = registry || new Registry();
// Define metrics
this.metrics = {
workflowsTotal: new Counter({
name: 'workflow_executions_total',
help: 'Total number of workflow executions',
labelNames: ['workflow_id', 'status'],
registers: [this.registry],
}),
workflowDuration: new Histogram({
name: 'workflow_duration_seconds',
help: 'Workflow execution duration',
labelNames: ['workflow_id'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
registers: [this.registry],
}),
stepsTotal: new Counter({
name: 'workflow_steps_total',
help: 'Total number of steps executed',
labelNames: ['step_type', 'status'],
registers: [this.registry],
}),
stepDuration: new Histogram({
name: 'workflow_step_duration_seconds',
help: 'Step execution duration',
labelNames: ['step_type'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10],
registers: [this.registry],
}),
activeWorkflows: new Gauge({
name: 'workflow_active_count',
help: 'Number of active workflows',
registers: [this.registry],
}),
errors: new Counter({
name: 'workflow_errors_total',
help: 'Total number of workflow errors',
labelNames: ['workflow_id', 'step_id', 'error_type'],
registers: [this.registry],
}),
};
}
init(engine: WorkflowEngine): void {
// Register metric endpoint
engine.on('metrics:collect', () => {
return this.registry.metrics();
});
}
onWorkflowStart(state: WorkflowState): void {
this.metrics.activeWorkflows.inc();
}
onWorkflowEnd(state: WorkflowState): void {
this.metrics.activeWorkflows.dec();
this.metrics.workflowsTotal.inc({
workflow_id: state.workflowId || 'unknown',
status: state.status,
});
if (state.endTime && state.startTime) {
const duration = (state.endTime - state.startTime) / 1000;
this.metrics.workflowDuration.observe({ workflow_id: state.workflowId || 'unknown' }, duration);
}
// Count errors
state.errors.forEach((error) => {
this.metrics.errors.inc({
workflow_id: state.workflowId || 'unknown',
step_id: error.stepId,
error_type: this.classifyError(error.error),
});
});
}
onStepStart(trace: ExecutionTrace): void {
// Could track concurrent steps
}
onStepEnd(trace: ExecutionTrace): void {
this.metrics.stepsTotal.inc({
step_type: trace.nodeType,
status: trace.status,
});
this.metrics.stepDuration.observe({ step_type: trace.nodeType }, trace.duration / 1000);
}
getMetrics(): Promise<string> {
return this.registry.metrics();
}
private classifyError(error: any): string {
if (error.includes('timeout')) return 'timeout';
if (error.includes('network')) return 'network';
if (error.includes('validation')) return 'validation';
if (error.includes('auth')) return 'auth';
return 'unknown';
}
}
// Usage in NestJS
@Controller('metrics')
export class MetricsController {
constructor(@Inject('PrometheusPlugin') private prometheus: PrometheusPlugin) {}
@Get()
async getMetrics(): Promise<string> {
return this.prometheus.getMetrics();
}
}

nodes/email-node.ts
import { BaseNode, AsyncNode, NodeMetadata } from '@your-org/workflow-engine-v3';
import nodemailer from 'nodemailer';
export class EmailNode extends AsyncNode {
type = 'email-send';
version = '1.0.0';
private transporter: nodemailer.Transporter;
constructor() {
super();
// Initialize email transporter
this.transporter = nodemailer.createTransport({
host: process.env.SMTP_HOST,
port: parseInt(process.env.SMTP_PORT || '587'),
secure: false,
auth: {
user: process.env.SMTP_USER,
pass: process.env.SMTP_PASS,
},
});
}
getMetadata(): NodeMetadata {
return {
type: this.type,
version: this.version,
displayName: 'Send Email',
description: 'Send an email via SMTP',
category: 'Communication',
icon: 'email',
inputs: [
{
name: 'to',
type: 'string',
required: true,
description: 'Recipient email address',
},
{
name: 'subject',
type: 'string',
required: true,
},
{
name: 'body',
type: 'string',
required: true,
},
{
name: 'attachments',
type: 'array',
required: false,
},
],
outputs: [
{
name: 'messageId',
type: 'string',
},
{
name: 'accepted',
type: 'array',
},
],
paramSchema: {
type: 'object',
properties: {
to: {
type: 'string',
format: 'email',
description: 'Recipient email',
},
cc: {
type: 'string',
description: 'CC recipients (comma-separated)',
},
bcc: {
type: 'string',
description: 'BCC recipients (comma-separated)',
},
subject: {
type: 'string',
maxLength: 200,
},
body: {
type: 'string',
},
html: {
type: 'boolean',
default: false,
description: 'Send as HTML email',
},
attachments: {
type: 'array',
items: {
type: 'object',
properties: {
filename: { type: 'string' },
content: { type: 'string' },
path: { type: 'string' },
},
},
},
priority: {
type: 'string',
enum: ['high', 'normal', 'low'],
default: 'normal',
},
},
required: ['to', 'subject', 'body'],
},
capabilities: {
async: true,
streaming: false,
batchable: true,
cacheable: false,
},
};
}
validateInput(input: any): void {
// Validate email format
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(input.to)) {
throw new Error(`Invalid email address: ${input.to}`);
}
if (input.cc) {
const ccEmails = input.cc.split(',').map((e: string) => e.trim());
for (const email of ccEmails) {
if (!emailRegex.test(email)) {
throw new Error(`Invalid CC email address: ${email}`);
}
}
}
if (!input.subject || input.subject.trim().length === 0) {
throw new Error('Email subject cannot be empty');
}
if (!input.body || input.body.trim().length === 0) {
throw new Error('Email body cannot be empty');
}
}
async executeAsync(prepared: any, context: WorkflowState): Promise<any> {
const { to, cc, bcc, subject, body, html = false, attachments = [], priority = 'normal' } = prepared;
// Prepare email options
const mailOptions: any = {
from: process.env.SMTP_FROM || '[email protected]',
to,
subject: this.interpolate(subject, context),
priority,
};
if (cc) mailOptions.cc = cc;
if (bcc) mailOptions.bcc = bcc;
if (html) {
mailOptions.html = this.interpolate(body, context);
} else {
mailOptions.text = this.interpolate(body, context);
}
if (attachments.length > 0) {
mailOptions.attachments = attachments;
}
try {
const info = await this.transporter.sendMail(mailOptions);
return {
success: true,
messageId: info.messageId,
accepted: info.accepted,
rejected: info.rejected,
response: info.response,
};
} catch (error: any) {
throw new Error(`Failed to send email: ${error.message}`);
}
}
private interpolate(text: string, context: WorkflowState): string {
// Replace template variables
return text.replace(/\{\{([^}]+)\}\}/g, (match, path) => {
const value = this.getValueByPath(context, path.trim());
return value !== undefined ? String(value) : match;
});
}
private getValueByPath(obj: any, path: string): any {
const parts = path.split('.');
let current = obj;
for (const part of parts) {
if (part.startsWith('$')) {
// Handle special references
switch (part) {
case '$input':
current = obj.inputs;
break;
case '$var':
current = obj.variables;
break;
default:
current = obj.results[part.slice(1)];
}
} else {
current = current?.[part];
}
}
return current;
}
async cleanup(context: WorkflowState): Promise<void> {
// Close transporter if needed
this.transporter.close();
}
estimateResources(params: any): ResourceRequirements {
// Estimate resources based on attachments
const attachmentSize =
params.attachments?.reduce((sum: number, att: any) => sum + (att.content?.length || 0), 0) || 0;
return {
cpu: 0.1, // 10% of a CPU core
memory: 50 + Math.ceil(attachmentSize / 1024 / 1024), // 50MB + attachment size
network: true,
duration: 5000, // Estimated 5 seconds
};
}
}
// Register the node
engine.registerNode('email-send', new EmailNode());
nodes/database-transaction-node.ts
import { AsyncNode, WorkflowState } from '@your-org/workflow-engine-v3';
import { Pool, PoolClient } from 'pg';
export class DatabaseTransactionNode extends AsyncNode {
type = 'db-transaction';
private pool: Pool;
private transactions: Map<string, PoolClient> = new Map();
constructor() {
super();
this.pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20,
});
}
async executeAsync(prepared: any, context: WorkflowState): Promise<any> {
const { operation, transactionId, query, params } = prepared;
switch (operation) {
case 'begin':
return this.beginTransaction(transactionId, context);
case 'query':
return this.executeQuery(transactionId, query, params);
case 'commit':
return this.commitTransaction(transactionId);
case 'rollback':
return this.rollbackTransaction(transactionId);
default:
throw new Error(`Unknown operation: ${operation}`);
}
}
private async beginTransaction(transactionId: string, context: WorkflowState): Promise<any> {
const client = await this.pool.connect();
await client.query('BEGIN');
this.transactions.set(transactionId, client);
// Store transaction ID in context for cleanup
context.variables._activeTransactions = context.variables._activeTransactions || [];
context.variables._activeTransactions.push(transactionId);
return { transactionId, status: 'begun' };
}
private async executeQuery(transactionId: string, query: string, params: any[]): Promise<any> {
const client = this.transactions.get(transactionId);
if (!client) {
throw new Error(`Transaction ${transactionId} not found`);
}
const result = await client.query(query, params);
return {
rows: result.rows,
rowCount: result.rowCount,
};
}
private async commitTransaction(transactionId: string): Promise<any> {
const client = this.transactions.get(transactionId);
if (!client) {
throw new Error(`Transaction ${transactionId} not found`);
}
await client.query('COMMIT');
client.release();
this.transactions.delete(transactionId);
return { transactionId, status: 'committed' };
}
private async rollbackTransaction(transactionId: string): Promise<any> {
const client = this.transactions.get(transactionId);
if (!client) {
throw new Error(`Transaction ${transactionId} not found`);
}
await client.query('ROLLBACK');
client.release();
this.transactions.delete(transactionId);
return { transactionId, status: 'rolled_back' };
}
async cleanup(context: WorkflowState): Promise<void> {
// Rollback any open transactions
const activeTransactions = context.variables._activeTransactions || [];
for (const transactionId of activeTransactions) {
try {
await this.rollbackTransaction(transactionId);
} catch (error) {
console.error(`Failed to rollback transaction ${transactionId}:`, error);
}
}
await this.pool.end();
}
}

storage/multi-tier-storage.ts
import { IStorageAdapter } from '@your-org/workflow-engine-v3';
import Redis from 'ioredis';
import AWS from 'aws-sdk';
import { Pool } from 'pg';
export class MultiTierStorageAdapter implements IStorageAdapter {
private redis: Redis;
private s3: AWS.S3;
private pg: Pool;
private readonly HOT_TTL = 3600; // 1 hour in Redis
private readonly WARM_TTL = 86400; // 1 day in PostgreSQL
// Cold storage in S3 indefinitely
constructor(config: {
redis: { host: string; port: number };
s3: { bucket: string; region: string };
pg: { connectionString: string };
}) {
this.redis = new Redis(config.redis);
this.s3 = new AWS.S3({ region: config.s3.region });
this.pg = new Pool({ connectionString: config.pg.connectionString });
}
async get(key: string): Promise<any> {
// Try hot storage (Redis)
let value = await this.redis.get(key);
if (value) {
return JSON.parse(value);
}
// Try warm storage (PostgreSQL)
const pgResult = await this.pg.query('SELECT value FROM workflow_cache WHERE key = $1 AND expires_at > NOW()', [
key,
]);
if (pgResult.rows.length > 0) {
value = pgResult.rows[0].value;
// Promote to hot storage
await this.redis.set(key, JSON.stringify(value), 'EX', this.HOT_TTL);
return value;
}
// Try cold storage (S3)
try {
const s3Result = await this.s3
.getObject({
Bucket: 'workflow-storage',
Key: `cold/${key}`,
})
.promise();
value = JSON.parse(s3Result.Body?.toString() || 'null');
// Promote to warm and hot storage
await this.pg.query(
'INSERT INTO workflow_cache (key, value, expires_at) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = $2, expires_at = $3',
[key, value, new Date(Date.now() + this.WARM_TTL * 1000)],
);
await this.redis.set(key, JSON.stringify(value), 'EX', this.HOT_TTL);
return value;
} catch (error: any) {
if (error.code === 'NoSuchKey') {
return null;
}
throw error;
}
}
async set(key: string, value: any, ttl?: number): Promise<void> {
const serialized = JSON.stringify(value);
// Determine storage tier based on TTL
if (!ttl || ttl <= this.HOT_TTL) {
// Hot storage only
await this.redis.set(key, serialized, 'EX', ttl || this.HOT_TTL);
} else if (ttl <= this.WARM_TTL) {
// Hot + Warm storage
await Promise.all([
this.redis.set(key, serialized, 'EX', this.HOT_TTL),
this.pg.query(
'INSERT INTO workflow_cache (key, value, expires_at) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = $2, expires_at = $3',
[key, value, new Date(Date.now() + ttl * 1000)],
),
]);
} else {
// All tiers
await Promise.all([
this.redis.set(key, serialized, 'EX', this.HOT_TTL),
this.pg.query(
'INSERT INTO workflow_cache (key, value, expires_at) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = $2, expires_at = $3',
[key, value, new Date(Date.now() + this.WARM_TTL * 1000)],
),
this.s3
.putObject({
Bucket: 'workflow-storage',
Key: `cold/${key}`,
Body: serialized,
ContentType: 'application/json',
})
.promise(),
]);
}
}
async delete(key: string): Promise<void> {
await Promise.all([
this.redis.del(key),
this.pg.query('DELETE FROM workflow_cache WHERE key = $1', [key]),
this.s3
.deleteObject({
Bucket: 'workflow-storage',
Key: `cold/${key}`,
})
.promise()
.catch(() => {}), // Ignore if not exists
]);
}
async exists(key: string): Promise<boolean> {
// Check hot storage first (fastest)
if (await this.redis.exists(key)) {
return true;
}
// Check warm storage
const pgResult = await this.pg.query('SELECT 1 FROM workflow_cache WHERE key = $1 AND expires_at > NOW()', [
key,
]);
if (pgResult.rows.length > 0) {
return true;
}
// Check cold storage
try {
await this.s3
.headObject({
Bucket: 'workflow-storage',
Key: `cold/${key}`,
})
.promise();
return true;
} catch {
return false;
}
}
// Implement other methods...
}

modules/workflow/workflow.module.ts
import { Module, Global } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { TypeOrmModule } from '@nestjs/typeorm';
import { WorkflowService } from './workflow.service';
import { WorkflowController } from './workflow.controller';
import { WorkflowRepository } from './workflow.repository';
import { WorkflowProcessor } from './workflow.processor';
import { EngineModule } from '../engine/engine.module';
@Global()
@Module({
imports: [
EventEmitterModule.forRoot(),
BullModule.registerQueue({ name: 'workflows' }, { name: 'steps' }, { name: 'python-nodes' }),
TypeOrmModule.forFeature([WorkflowEntity, ExecutionEntity]),
EngineModule,
],
controllers: [WorkflowController],
providers: [WorkflowService, WorkflowRepository, WorkflowProcessor],
exports: [WorkflowService],
})
export class WorkflowModule {}
modules/workflow/workflow.service.ts
import { Injectable, Logger, NotFoundException } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bullmq';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { WorkflowEngine, WorkflowState } from '@your-org/workflow-engine-v3';
import { WorkflowRepository } from './workflow.repository';
@Injectable()
export class WorkflowService {
private readonly logger = new Logger(WorkflowService.name);
private engine: WorkflowEngine;
constructor(
@InjectQueue('workflows') private workflowQueue: Queue,
@InjectQueue('steps') private stepQueue: Queue,
private repository: WorkflowRepository,
private eventEmitter: EventEmitter2,
) {
this.initializeEngine();
}
private initializeEngine(): void {
this.engine = new WorkflowEngine({
storage: new RedisStorageAdapter(/* config */),
enableTracing: true,
enableProfiling: true,
});
// Setup event forwarding
this.engine.on('workflow:start', (state) => {
this.eventEmitter.emit('workflow.started', state);
});
this.engine.on('workflow:complete', (state) => {
this.eventEmitter.emit('workflow.completed', state);
});
this.engine.on('workflow:error', ({ state, error }) => {
this.eventEmitter.emit('workflow.failed', { state, error });
});
this.logger.log('Workflow engine initialized');
}
async createWorkflow(dto: CreateWorkflowDto): Promise<WorkflowEntity> {
const workflow = await this.repository.create(dto);
// Validate workflow definition
try {
this.engine.validateWorkflow(workflow.definition);
} catch (error: any) {
throw new BadRequestException(`Invalid workflow: ${error.message}`);
}
return workflow;
}
async executeWorkflow(workflowId: string, inputs: Record<string, any> = {}): Promise<ExecutionResponseDto> {
const workflow = await this.repository.findById(workflowId);
if (!workflow) {
throw new NotFoundException(`Workflow ${workflowId} not found`);
}
// Create execution record
const execution = await this.repository.createExecution({
workflowId,
inputs,
status: 'pending',
});
// Queue for async execution
const job = await this.workflowQueue.add('execute', {
workflowId,
executionId: execution.id,
definition: workflow.definition,
inputs,
});
// Emit event
this.eventEmitter.emit('workflow.queued', {
workflowId,
executionId: execution.id,
jobId: job.id,
});
return {
executionId: execution.id,
status: 'queued',
jobId: job.id,
};
}
async getExecutionStatus(executionId: string): Promise<WorkflowState | null> {
return this.engine.loadState(executionId);
}
async getExecutionHistory(
workflowId: string,
options: PaginationOptions,
): Promise<PaginatedResult<ExecutionEntity>> {
return this.repository.findExecutions(workflowId, options);
}
async pauseExecution(executionId: string): Promise<void> {
await this.engine.pause(executionId);
await this.repository.updateExecution(executionId, {
status: 'paused',
pausedAt: new Date(),
});
this.eventEmitter.emit('workflow.paused', { executionId });
}
async resumeExecution(executionId: string): Promise<void> {
const execution = await this.repository.findExecution(executionId);
if (!execution) {
throw new NotFoundException(`Execution ${executionId} not found`);
}
if (execution.status !== 'paused') {
throw new BadRequestException(`Execution ${executionId} is not paused`);
}
// Queue resume job
await this.workflowQueue.add('resume', {
executionId,
workflowId: execution.workflowId,
});
await this.repository.updateExecution(executionId, {
status: 'resuming',
resumedAt: new Date(),
});
this.eventEmitter.emit('workflow.resumed', { executionId });
}
async cancelExecution(executionId: string): Promise<void> {
await this.engine.cancel(executionId);
// Remove pending jobs
const jobs = await this.workflowQueue.getJobs(['active', 'waiting', 'delayed']);
for (const job of jobs) {
if (job.data.executionId === executionId) {
await job.remove();
}
}
await this.repository.updateExecution(executionId, {
status: 'cancelled',
cancelledAt: new Date(),
});
this.eventEmitter.emit('workflow.cancelled', { executionId });
}
async retryExecution(executionId: string): Promise<void> {
const execution = await this.repository.findExecution(executionId);
if (!execution) {
throw new NotFoundException(`Execution ${executionId} not found`);
}
if (execution.status !== 'failed') {
throw new BadRequestException(`Execution ${executionId} has not failed`);
}
// Load last state
const state = await this.engine.loadState(executionId);
if (!state) {
throw new NotFoundException(`State for execution ${executionId} not found`);
}
// Queue retry from last checkpoint
await this.workflowQueue.add('retry', {
executionId,
workflowId: execution.workflowId,
fromCheckpoint: state.lastCheckpoint,
});
await this.repository.updateExecution(executionId, {
status: 'retrying',
retriedAt: new Date(),
retryCount: (execution.retryCount || 0) + 1,
});
this.eventEmitter.emit('workflow.retried', { executionId });
}
async getExecutionTraces(executionId: string): Promise<ExecutionTrace[]> {
const traces = this.engine.getTraces(executionId);
// Enrich traces with additional metadata
return traces.map((trace) => ({
...trace,
executionId,
timestamp: new Date(trace.startTime),
}));
}
async validateWorkflow(definition: any): Promise<ValidationResult> {
try {
this.engine.validateWorkflow(definition);
return { valid: true };
} catch (error: any) {
return {
valid: false,
errors: [error.message],
};
}
}
}
modules/workflow/workflow.processor.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bullmq';
import { Logger } from '@nestjs/common';
import { WorkflowEngine } from '@your-org/workflow-engine-v3';
import { WorkflowRepository } from './workflow.repository';
@Processor('workflows')
export class WorkflowProcessor {
private readonly logger = new Logger(WorkflowProcessor.name);
private engine: WorkflowEngine;
constructor(private repository: WorkflowRepository) {
this.initializeEngine();
}
private initializeEngine(): void {
this.engine = new WorkflowEngine({
// Configuration
});
}
@Process('execute')
async handleExecute(job: Job<WorkflowJobData>): Promise<any> {
const { workflowId, executionId, definition, inputs } = job.data;
this.logger.log(`Executing workflow ${workflowId} - ${executionId}`);
try {
// Update status
await this.repository.updateExecution(executionId, {
status: 'running',
startedAt: new Date(),
});
// Update job progress
await job.updateProgress(10);
// Execute workflow
const result = await this.engine.execute(definition, inputs, executionId);
// Update execution record
await this.repository.updateExecution(executionId, {
status: 'completed',
completedAt: new Date(),
outputs: result.outputs,
});
await job.updateProgress(100);
this.logger.log(`Workflow ${executionId} completed successfully`);
return result;
} catch (error: any) {
this.logger.error(`Workflow ${executionId} failed:`, error);
// Update execution record
await this.repository.updateExecution(executionId, {
status: 'failed',
failedAt: new Date(),
error: error.message,
});
throw error;
}
}
@Process('resume')
async handleResume(job: Job<ResumeJobData>): Promise<any> {
const { executionId, workflowId } = job.data;
this.logger.log(`Resuming workflow ${executionId}`);
try {
const result = await this.engine.resume(executionId);
await this.repository.updateExecution(executionId, {
status: 'completed',
completedAt: new Date(),
outputs: result.outputs,
});
return result;
} catch (error: any) {
this.logger.error(`Failed to resume workflow ${executionId}:`, error);
await this.repository.updateExecution(executionId, {
status: 'failed',
error: error.message,
});
throw error;
}
}
@Process('retry')
async handleRetry(job: Job<RetryJobData>): Promise<any> {
const { executionId, fromCheckpoint } = job.data;
this.logger.log(`Retrying workflow ${executionId} from checkpoint ${fromCheckpoint}`);
try {
// Load checkpoint state
const state = await this.engine.loadCheckpoint(executionId, fromCheckpoint);
if (!state) {
throw new Error(`Checkpoint ${fromCheckpoint} not found`);
}
// Resume from checkpoint
const result = await this.engine.execute(state.workflowId!, state.inputs, executionId);
await this.repository.updateExecution(executionId, {
status: 'completed',
completedAt: new Date(),
outputs: result.outputs,
});
return result;
} catch (error: any) {
this.logger.error(`Failed to retry workflow ${executionId}:`, error);
await this.repository.updateExecution(executionId, {
status: 'failed',
error: error.message,
});
throw error;
}
}
}

config/queue.config.ts
import { BullModuleOptions } from '@nestjs/bull';
import { Redis } from 'ioredis';
export const queueConfig: BullModuleOptions = {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0'),
},
defaultJobOptions: {
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 500, // Keep last 500 failed jobs
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
},
};
// Queue definitions
export const QUEUES = {
WORKFLOWS: 'workflows',
STEPS: 'steps',
PYTHON_NODES: 'python-nodes',
NOTIFICATIONS: 'notifications',
CLEANUP: 'cleanup',
} as const;
// Job priorities
export const PRIORITIES = {
CRITICAL: 1,
HIGH: 5,
NORMAL: 10,
LOW: 20,
} as const;
workers/step.processor.ts
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Job } from 'bullmq';
import { Logger } from '@nestjs/common';
@Processor('steps')
export class StepProcessor {
private readonly logger = new Logger(StepProcessor.name);
// Track active jobs for resource management
private activeJobs = new Map<string, Job>();
@OnQueueActive()
onActive(job: Job): void {
this.activeJobs.set(job.id!, job);
this.logger.debug(`Processing step job ${job.id} - ${job.name}`);
}
@OnQueueCompleted()
onCompleted(job: Job, result: any): void {
this.activeJobs.delete(job.id!);
this.logger.debug(`Step job ${job.id} completed`);
}
@OnQueueFailed()
onFailed(job: Job, error: Error): void {
this.activeJobs.delete(job.id!);
this.logger.error(`Step job ${job.id} failed:`, error);
}
@Process('data-transform')
async processDataTransform(job: Job<DataTransformJobData>): Promise<any> {
const { data, operations } = job.data;
let result = data;
for (const [index, operation] of operations.entries()) {
await job.updateProgress((index / operations.length) * 100);
result = await this.applyOperation(result, operation);
// Check if job should be cancelled
if (job.opts.repeat && (await this.shouldCancel(job))) {
throw new Error('Job cancelled');
}
}
return result;
}
@Process('batch-process')
async processBatch(job: Job<BatchJobData>): Promise<any> {
const { items, processor } = job.data;
const results = [];
// Process in chunks
const chunkSize = 10;
const chunks = this.chunk(items, chunkSize);
for (const [index, chunk] of chunks.entries()) {
await job.updateProgress((index / chunks.length) * 100);
const chunkResults = await Promise.all(chunk.map((item) => this.processItem(item, processor)));
results.push(...chunkResults);
}
return results;
}
private async applyOperation(data: any, operation: any): Promise<any> {
switch (operation.type) {
case 'filter':
return data.filter(operation.predicate);
case 'map':
return data.map(operation.mapper);
case 'reduce':
return data.reduce(operation.reducer, operation.initialValue);
case 'sort':
return data.sort(operation.comparator);
default:
throw new Error(`Unknown operation: ${operation.type}`);
}
}
private chunk<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
private async shouldCancel(job: Job): Promise<boolean> {
// Check if workflow was cancelled
const state = await this.getWorkflowState(job.data.executionId);
return state?.status === 'cancelled';
}
private async processItem(item: any, processor: string): Promise<any> {
// Process individual item
return item;
}
private async getWorkflowState(executionId: string): Promise<any> {
// Get workflow state from storage
return null;
}
}
workers/worker-pool.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { Worker, Queue } from 'bullmq';
import { Logger } from '@nestjs/common';
@Injectable()
export class WorkerPoolService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(WorkerPoolService.name);
private workers: Worker[] = [];
private queues: Map<string, Queue> = new Map();
async onModuleInit(): Promise<void> {
await this.initializeWorkerPool();
}
async onModuleDestroy(): Promise<void> {
await this.shutdownWorkerPool();
}
private async initializeWorkerPool(): Promise<void> {
const workerConfig = {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '5'),
};
// Create workers for different queues
const queues = ['workflows', 'steps', 'python-nodes'];
for (const queueName of queues) {
const queue = new Queue(queueName, { connection: workerConfig.connection });
this.queues.set(queueName, queue);
// Create multiple workers per queue
const workerCount = this.getWorkerCount(queueName);
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(queueName, this.getProcessor(queueName), workerConfig);
worker.on('completed', (job) => {
this.logger.debug(`Job ${job.id} completed in queue ${queueName}`);
});
worker.on('failed', (job, error) => {
this.logger.error(`Job ${job?.id} failed in queue ${queueName}:`, error);
});
this.workers.push(worker);
}
this.logger.log(`Initialized ${workerCount} workers for queue ${queueName}`);
}
}
private getWorkerCount(queueName: string): number {
const counts: Record<string, number> = {
workflows: parseInt(process.env.WORKFLOW_WORKERS || '2'),
steps: parseInt(process.env.STEP_WORKERS || '5'),
'python-nodes': parseInt(process.env.PYTHON_WORKERS || '3'),
};
return counts[queueName] || 1;
}
private getProcessor(queueName: string): any {
// Return the appropriate processor function for each queue
const processors: Record<string, any> = {
workflows: this.processWorkflow.bind(this),
steps: this.processStep.bind(this),
'python-nodes': this.processPythonNode.bind(this),
};
return processors[queueName];
}
private async processWorkflow(job: Job): Promise<any> {
// Workflow processing logic
return null;
}
private async processStep(job: Job): Promise<any> {
// Step processing logic
return null;
}
private async processPythonNode(job: Job): Promise<any> {
// Python node processing logic
return null;
}
private async shutdownWorkerPool(): Promise<void> {
this.logger.log('Shutting down worker pool...');
// Close all workers gracefully
await Promise.all(this.workers.map((worker) => worker.close()));
// Close all queues
await Promise.all(Array.from(this.queues.values()).map((queue) => queue.close()));
this.logger.log('Worker pool shutdown complete');
}
// Dynamic scaling
async scaleWorkers(queueName: string, count: number): Promise<void> {
const currentWorkers = this.workers.filter((w) => w.name === queueName);
const currentCount = currentWorkers.length;
if (count > currentCount) {
// Add workers
for (let i = 0; i < count - currentCount; i++) {
const worker = new Worker(queueName, this.getProcessor(queueName), {
/* config */
});
this.workers.push(worker);
}
this.logger.log(`Scaled up ${queueName} to ${count} workers`);
} else if (count < currentCount) {
// Remove workers
const workersToRemove = currentWorkers.slice(count);
for (const worker of workersToRemove) {
await worker.close();
const index = this.workers.indexOf(worker);
this.workers.splice(index, 1);
}
this.logger.log(`Scaled down ${queueName} to ${count} workers`);
}
}
// Metrics
async getQueueMetrics(): Promise<QueueMetrics[]> {
const metrics: QueueMetrics[] = [];
for (const [name, queue] of this.queues) {
const counts = await queue.getJobCounts();
const workers = this.workers.filter((w) => w.name === name);
metrics.push({
name,
counts,
workerCount: workers.length,
isPaused: await queue.isPaused(),
});
}
return metrics;
}
}

workers/python/worker.py
"""
Python Worker for Workflow Engine v3
Handles data-intensive operations using Python libraries
"""
import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict, List
from dataclasses import dataclass
import redis
from bullmq import Worker, Job
import pandas as pd
import numpy as np
from sklearn import preprocessing, model_selection, ensemble
import tensorflow as tf
import torch
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
@dataclass
class WorkerConfig:
redis_host: str = os.getenv('REDIS_HOST', 'localhost')
redis_port: int = int(os.getenv('REDIS_PORT', 6379))
redis_password: str = os.getenv('REDIS_PASSWORD', '')
queue_name: str = 'python-nodes'
concurrency: int = int(os.getenv('PYTHON_WORKER_CONCURRENCY', 4))
max_jobs_per_worker: int = int(os.getenv('MAX_JOBS_PER_WORKER', 10))
config = WorkerConfig()
# Redis connection
redis_client = redis.Redis(
host=config.redis_host,
port=config.redis_port,
password=config.redis_password,
decode_responses=True
)
class NodeProcessor:
"""Base class for node processors"""
def __init__(self):
self.cache = {}
async def process(self, node_type: str, params: Dict, context: Dict) -> Any:
"""Route to specific processor"""
processors = {
'pandas': self.process_pandas,
'numpy': self.process_numpy,
'scikit': self.process_scikit,
'tensorflow': self.process_tensorflow,
'torch': self.process_torch,
'custom': self.process_custom,
}
processor = processors.get(node_type)
if not processor:
raise ValueError(f"Unknown node type: {node_type}")
return await processor(params, context)
async def process_pandas(self, params: Dict, context: Dict) -> Any:
"""Pandas operations"""
operation = params.get('operation')
data = params.get('data')
# Convert to DataFrame
if isinstance(data, list):
df = pd.DataFrame(data)
elif isinstance(data, dict):
df = pd.DataFrame.from_dict(data)
else:
df = data
operations = {
'describe': lambda: df.describe().to_dict(),
'info': lambda: self._df_info(df),
'head': lambda: df.head(params.get('n', 5)).to_dict(),
'tail': lambda: df.tail(params.get('n', 5)).to_dict(),
'groupby': lambda: self._groupby(df, params),
'aggregate': lambda: self._aggregate(df, params),
'pivot': lambda: self._pivot(df, params),
'melt': lambda: self._melt(df, params),
'merge': lambda: self._merge(df, params),
'concat': lambda: self._concat(df, params),
'join': lambda: self._join(df, params),
'fillna': lambda: df.fillna(params.get('value', 0)).to_dict(),
'dropna': lambda: df.dropna().to_dict(),
'drop_duplicates': lambda: df.drop_duplicates().to_dict(),
'sort_values': lambda: df.sort_values(
by=params.get('by'),
ascending=params.get('ascending', True)
).to_dict(),
'query': lambda: df.query(params.get('expression')).to_dict(),
'filter': lambda: self._filter(df, params),
'to_csv': lambda: df.to_csv(index=False),
'to_json': lambda: df.to_json(),
'to_parquet': lambda: self._to_parquet(df, params),
}
handler = operations.get(operation)
if not handler:
raise ValueError(f"Unknown pandas operation: {operation}")
return handler()
def _groupby(self, df: pd.DataFrame, params: Dict) -> Dict:
"""Group by operation"""
group_by = params.get('groupBy')
agg_col = params.get('aggregateColumn')
agg_func = params.get('aggregateFunction', 'mean')
grouped = df.groupby(group_by)[agg_col]
agg_functions = {
'mean': grouped.mean,
'sum': grouped.sum,
'count': grouped.count,
'min': grouped.min,
'max': grouped.max,
'std': grouped.std,
'var': grouped.var,
'median': grouped.median,
}
result = agg_functions[agg_func]()
return result.to_dict()
def _pivot(self, df: pd.DataFrame, params: Dict) -> Dict:
"""Pivot table operation"""
return df.pivot_table(
index=params.get('index'),
columns=params.get('columns'),
values=params.get('values'),
aggfunc=params.get('aggfunc', 'mean'),
fill_value=params.get('fillValue', 0)
).to_dict()
async def process_numpy(self, params: Dict, context: Dict) -> Any:
"""NumPy operations"""
operation = params.get('operation')
data = np.array(params.get('data', []))
operations = {
# Statistical
'mean': lambda: float(np.mean(data)),
'median': lambda: float(np.median(data)),
'std': lambda: float(np.std(data)),
'var': lambda: float(np.var(data)),
'min': lambda: float(np.min(data)),
'max': lambda: float(np.max(data)),
'sum': lambda: float(np.sum(data)),
'prod': lambda: float(np.prod(data)),
# Linear algebra
'dot': lambda: np.dot(data, np.array(params.get('other'))).tolist(),
'matmul': lambda: np.matmul(data, np.array(params.get('other'))).tolist(),
'transpose': lambda: data.T.tolist(),
'inverse': lambda: np.linalg.inv(data).tolist(),
'determinant': lambda: float(np.linalg.det(data)),
'eigenvalues': lambda: np.linalg.eigvals(data).tolist(),
# Array operations
'reshape': lambda: data.reshape(params.get('shape')).tolist(),
'flatten': lambda: data.flatten().tolist(),
'unique': lambda: np.unique(data).tolist(),
'sort': lambda: np.sort(data).tolist(),
'argsort': lambda: np.argsort(data).tolist(),
# Mathematical
'fft': lambda: np.fft.fft(data).tolist(),
'ifft': lambda: np.fft.ifft(data).tolist(),
'convolve': lambda: np.convolve(
data,
np.array(params.get('kernel')),
mode=params.get('mode', 'valid')
).tolist(),
'correlate': lambda: np.correlate(
data,
np.array(params.get('other', data)),
mode=params.get('mode', 'valid')
).tolist(),
}
handler = operations.get(operation)
if not handler:
raise ValueError(f"Unknown numpy operation: {operation}")
return handler()
async def process_scikit(self, params: Dict, context: Dict) -> Any:
"""Scikit-learn operations"""
operation = params.get('operation')
data = params.get('data')
if operation == 'train':
return await self._train_model(params, context)
elif operation == 'predict':
return await self._predict(params, context)
elif operation == 'evaluate':
return await self._evaluate(params, context)
elif operation == 'preprocess':
return await self._preprocess(data, params)
else:
raise ValueError(f"Unknown scikit operation: {operation}")
async def _train_model(self, params: Dict, context: Dict) -> Dict:
"""Train a scikit-learn model"""
algorithm = params.get('algorithm', 'RandomForest')
X = np.array(params.get('features'))
y = np.array(params.get('target'))
test_size = params.get('testSize', 0.2)
# Split data
X_train, X_test, y_train, y_test = model_selection.train_test_split(
X, y, test_size=test_size, random_state=42
)
# Select model
models = {
'RandomForest': ensemble.RandomForestClassifier(
**params.get('modelParams', {})
),
'GradientBoosting': ensemble.GradientBoostingClassifier(
**params.get('modelParams', {})
),
'RandomForestRegressor': ensemble.RandomForestRegressor(
**params.get('modelParams', {})
),
'GradientBoostingRegressor': ensemble.GradientBoostingRegressor(
**params.get('modelParams', {})
),
}
model = models.get(algorithm)
if not model:
raise ValueError(f"Unknown algorithm: {algorithm}")
# Train model
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Store model in cache
model_id = f"{context.get('executionId')}_model_{algorithm}"
self.cache[model_id] = model
# Also persist to disk if needed
import joblib
model_path = f"/tmp/{model_id}.pkl"
joblib.dump(model, model_path)
return {
'modelId': model_id,
'modelPath': model_path,
'algorithm': algorithm,
'trainScore': float(train_score),
'testScore': float(test_score),
'feature_importances': model.feature_importances_.tolist()
if hasattr(model, 'feature_importances_') else None,
}
async def _preprocess(self, data: Any, params: Dict) -> Any:
"""Preprocessing operations"""
operation = params.get('preprocessOperation', 'scale')
if operation == 'scale':
scaler = preprocessing.StandardScaler()
scaled = scaler.fit_transform(data)
return scaled.tolist()
elif operation == 'normalize':
normalized = preprocessing.normalize(data, norm=params.get('norm', 'l2'))
return normalized.tolist()
elif operation == 'encode':
encoder = preprocessing.LabelEncoder()
encoded = encoder.fit_transform(data)
return encoded.tolist()
elif operation == 'onehot':
encoder = preprocessing.OneHotEncoder(sparse=False)
encoded = encoder.fit_transform(np.array(data).reshape(-1, 1))
return encoded.tolist()
else:
raise ValueError(f"Unknown preprocessing operation: {operation}")
async def process_tensorflow(self, params: Dict, context: Dict) -> Any:
"""TensorFlow operations"""
operation = params.get('operation')
if operation == 'train':
return await self._train_tf_model(params, context)
elif operation == 'predict':
return await self._predict_tf(params, context)
elif operation == 'evaluate':
return await self._evaluate_tf(params, context)
else:
raise ValueError(f"Unknown TensorFlow operation: {operation}")
async def _train_tf_model(self, params: Dict, context: Dict) -> Dict:
"""Train a TensorFlow model"""
model_config = params.get('modelConfig', {})
X = np.array(params.get('features'))
y = np.array(params.get('target'))
# Build model
model = tf.keras.Sequential()
for layer_config in model_config.get('layers', []):
layer_type = layer_config.get('type')
if layer_type == 'Dense':
model.add(tf.keras.layers.Dense(
units=layer_config.get('units'),
activation=layer_config.get('activation')
))
elif layer_type == 'Dropout':
model.add(tf.keras.layers.Dropout(
rate=layer_config.get('rate')
))
elif layer_type == 'Conv2D':
model.add(tf.keras.layers.Conv2D(
filters=layer_config.get('filters'),
kernel_size=layer_config.get('kernel_size'),
activation=layer_config.get('activation')
))
# Add more layer types as needed
# Compile model
compile_config = model_config.get('compile', {})
model.compile(
optimizer=compile_config.get('optimizer', 'adam'),
loss=compile_config.get('loss', 'mse'),
metrics=compile_config.get('metrics', ['accuracy'])
)
# Train model
history = model.fit(
X, y,
epochs=model_config.get('epochs', 10),
batch_size=model_config.get('batchSize', 32),
validation_split=model_config.get('validationSplit', 0.2),
verbose=0
)
# Save model
model_id = f"{context.get('executionId')}_tfmodel"
model_path = f"/tmp/{model_id}"
model.save(model_path)
return {
'modelId': model_id,
'modelPath': model_path,
'history': {
'loss': history.history['loss'],
'val_loss': history.history.get('val_loss', []),
'metrics': {k: v for k, v in history.history.items()
if k not in ['loss', 'val_loss']}
}
}
async def process_torch(self, params: Dict, context: Dict) -> Any:
"""PyTorch operations"""
operation = params.get('operation')
# Similar to TensorFlow but using PyTorch
# Implementation depends on specific requirements
return {'message': 'PyTorch operation completed'}
async def process_custom(self, params: Dict, context: Dict) -> Any:
"""Execute custom Python code"""
code = params.get('code')
if not code:
raise ValueError("No code provided for custom execution")
# Create safe execution environment
safe_globals = {
'pd': pd,
'np': np,
'params': params,
'context': context,
}
safe_locals = {}
# Execute code
exec(code, safe_globals, safe_locals)
# Return result if specified
return safe_locals.get('result', None)
async def process_job(job: Job) -> Any:
"""Main job processor"""
logger.info(f"Processing job {job.id}: {job.name}")
try:
# Extract job data
node_type = job.data.get('nodeType')
params = job.data.get('params', {})
context = job.data.get('context', {})
# Create processor
processor = NodeProcessor()
# Process node
result = await processor.process(node_type, params, context)
# Store result in Redis if large
if sys.getsizeof(result) > 1024 * 1024: # 1MB
result_key = f"result:{job.id}"
redis_client.set(result_key, json.dumps(result), ex=3600)
return {'resultKey': result_key, 'size': sys.getsizeof(result)}
logger.info(f"Job {job.id} completed successfully")
return result
except Exception as e:
logger.error(f"Job {job.id} failed: {e}")
raise
async def health_check():
"""Health check endpoint"""
try:
# Check Redis connection
redis_client.ping()
# Check library imports
import pandas
import numpy
import sklearn
import tensorflow
return {
'status': 'healthy',
'redis': 'connected',
'libraries': {
'pandas': pandas.__version__,
'numpy': numpy.__version__,
'sklearn': sklearn.__version__,
'tensorflow': tensorflow.__version__,
}
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
async def main():
"""Main worker loop"""
logger.info("Starting Python worker...")
# Create worker
worker = Worker(
config.queue_name,
process_job,
{
'connection': {
'host': config.redis_host,
'port': config.redis_port,
'password': config.redis_password,
},
'concurrency': config.concurrency,
'maxJobsPerWorker': config.max_jobs_per_worker,
}
)
# Setup signal handlers
import signal
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
worker.close()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Run worker
logger.info(f"Python worker started on queue '{config.queue_name}'")
try:
await worker.run()
except Exception as e:
logger.error(f"Worker error: {e}")
raise
if __name__ == '__main__':
asyncio.run(main())
Dockerfile.python-worker
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy worker code
COPY workers/python/ .
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD python -c "import asyncio; import worker; print(asyncio.run(worker.health_check()))"
# Run worker
CMD ["python", "worker.py"]

test/nodes/email-node.spec.ts
import { EmailNode } from '../../src/nodes/email-node';
import { WorkflowState } from '@your-org/workflow-engine-v3';
describe('EmailNode', () => {
let node: EmailNode;
let mockState: WorkflowState;
beforeEach(() => {
node = new EmailNode();
mockState = {
executionId: 'test-exec-1',
status: 'running',
inputs: { userId: '123' },
variables: { userName: 'John Doe' },
results: {},
// ... other required fields
} as WorkflowState;
});
describe('validateInput', () => {
it('should validate email format', () => {
expect(() => {
node.validateInput({
to: 'invalid-email',
subject: 'Test',
body: 'Test body',
});
}).toThrow('Invalid email address');
expect(() => {
node.validateInput({
subject: 'Test',
body: 'Test body',
});
}).not.toThrow();
});
it('should validate required fields', () => {
expect(() => {
node.validateInput({
subject: '',
body: 'Body',
});
}).toThrow('Email subject cannot be empty');
});
});
describe('executeAsync', () => {
it('should send email successfully', async () => {
const params = {
subject: 'Hello {{userName}}',
body: 'Welcome, {{userName}}!',
};
const result = await node.executeAsync(node.prepare(mockState, params), mockState);
expect(result.success).toBe(true);
expect(result.messageId).toBeDefined();
});
it('should interpolate variables', async () => {
const params = {
subject: 'Hello {{var.userName}}',
body: 'User ID: {{input.userId}}',
};
const prepared = node.prepare(mockState, params);
expect(prepared.subject).toContain('John Doe');
expect(prepared.body).toContain('123');
});
});
});
test/integration/workflow.integration.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { WorkflowService } from '../../src/modules/workflow/workflow.service';
import { WorkflowEngine } from '@your-org/workflow-engine-v3';
describe('WorkflowService Integration', () => {
let service: WorkflowService;
let engine: WorkflowEngine;
beforeAll(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [WorkflowService],
}).compile();
service = module.get<WorkflowService>(WorkflowService);
engine = service['engine'];
});
it('should execute a simple workflow', async () => {
const workflow = {
name: 'Test Workflow',
version: '1.0.0',
steps: [
{
stepType: 'node',
type: 'log',
params: { message: 'Test' },
},
],
};
const execution = await service.executeWorkflow('test-1', {});
// Wait for completion
await new Promise((resolve) => setTimeout(resolve, 1000));
const state = await service.getExecutionStatus(execution.executionId);
expect(state?.status).toBe('completed');
});
it('should handle parallel execution', async () => {
const workflow = {
name: 'Parallel Test',
version: '1.0.0',
steps: [
{
stepType: 'parallel',
steps: [
{ stepType: 'node', type: 'delay', params: { milliseconds: 100 } },
{ stepType: 'node', type: 'delay', params: { milliseconds: 100 } },
{ stepType: 'node', type: 'delay', params: { milliseconds: 100 } },
],
},
],
};
const start = Date.now();
const execution = await service.executeWorkflow('test-2', {});
await new Promise((resolve) => setTimeout(resolve, 500));
const duration = Date.now() - start;
// Should complete in ~100ms, not 300ms
expect(duration).toBeLessThan(200);
});
});
test/e2e/workflow.e2e.spec.ts
import { INestApplication } from '@nestjs/common';
import { Test, TestingModule } from '@nestjs/testing';
import * as request from 'supertest';
import { AppModule } from '../../src/app.module';
describe('WorkflowController E2E', () => {
let app: INestApplication;
beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication();
await app.init();
});
afterAll(async () => {
await app.close();
});
it('/workflows (POST) - Create workflow', async () => {
const workflow = {
name: 'E2E Test Workflow',
description: 'Test workflow',
definition: {
version: '1.0.0',
steps: [{ stepType: 'node', type: 'log', params: { message: 'E2E Test' } }],
},
};
const response = await request(app.getHttpServer()).post('/workflows').send(workflow).expect(201);
expect(response.body).toHaveProperty('id');
expect(response.body.name).toBe(workflow.name);
});
it('/workflows/:id/execute (POST) - Execute workflow', async () => {
// First create a workflow
const createResponse = await request(app.getHttpServer())
.post('/workflows')
.send({
name: 'Execute Test',
definition: {
version: '1.0.0',
steps: [{ stepType: 'node', type: 'set', params: { name: 'test', value: 123 } }],
},
});
const workflowId = createResponse.body.id;
// Execute the workflow
const executeResponse = await request(app.getHttpServer())
.post(`/workflows/${workflowId}/execute`)
.send({ input: 'test' })
.expect(200);
expect(executeResponse.body).toHaveProperty('executionId');
expect(executeResponse.body.status).toBe('queued');
});
it('/workflows/executions/:id (GET) - Get execution status', async () => {
// Create and execute a workflow
const createResponse = await request(app.getHttpServer())
.post('/workflows')
.send({
name: 'Status Test',
definition: {
version: '1.0.0',
steps: [{ stepType: 'node', type: 'log', params: { message: 'Status test' } }],
},
});
const executeResponse = await request(app.getHttpServer())
.post(`/workflows/${createResponse.body.id}/execute`)
.send({});
const executionId = executeResponse.body.executionId;
// Wait a bit for execution
await new Promise((resolve) => setTimeout(resolve, 1000));
// Get status
const statusResponse = await request(app.getHttpServer())
.get(`/workflows/executions/${executionId}`)
.expect(200);
expect(statusResponse.body).toHaveProperty('status');
expect(['running', 'completed']).toContain(statusResponse.body.status);
});
});

// DO: Make nodes focused and reusable
export class SinglePurposeNode extends BaseNode {
// One clear responsibility
}
// DON'T: Create mega-nodes
export class DoEverythingNode extends BaseNode {
// Too many responsibilities
}
// DO: Validate inputs properly
validateInput(input: any): void {
const schema = z.object({
required: z.string(),
optional: z.string().optional(),
});
schema.parse(input);
}
// DO: Handle errors gracefully
async executeAsync(prepared: any): Promise<any> {
try {
return await this.doWork(prepared);
} catch (error) {
// Log for debugging
this.logger.error('Node execution failed:', error);
// Return meaningful error
throw new NodeExecutionError(
`Failed to execute ${this.type}: ${error.message}`,
{ originalError: error, params: prepared }
);
}
}
// DO: Use appropriate storage tiers
class OptimizedStorage {
async get(key: string): Promise<any> {
// Try cache first
const cached = await this.cache.get(key);
if (cached) return cached;
// Then persistent storage
const stored = await this.persistent.get(key);
if (stored) {
// Update cache
await this.cache.set(key, stored, 300); // 5 min cache
return stored;
}
return null;
}
}
// DO: Clean up resources
async cleanup(): Promise<void> {
// Close connections
await this.db?.close();
await this.redis?.quit();
// Clear caches
this.cache.clear();
// Cancel pending operations
this.pendingOps.forEach(op => op.cancel());
}
// DO: Use connection pooling
const pool = new Pool({
max: 20,
min: 5,
idle: 10000,
});
// DO: Batch operations
async processBatch(items: any[]): Promise<any[]> {
const BATCH_SIZE = 100;
const results = [];
for (let i = 0; i < items.length; i += BATCH_SIZE) {
const batch = items.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.all(
batch.map(item => this.processItem(item))
);
results.push(...batchResults);
}
return results;
}
// DO: Implement caching
@Cacheable({ ttl: 300 })
async expensiveOperation(params: any): Promise<any> {
// This result will be cached for 5 minutes
return await this.compute(params);
}
// DO: Validate all inputs
const InputSchema = z.object({
userId: z.string().uuid(),
email: z.string().email(),
role: z.enum(['admin', 'user']),
});
// DO: Use parameterized queries
async query(sql: string, params: any[]): Promise<any> {
// Safe from SQL injection
return await this.pool.query(sql, params);
}
// DON'T: Build queries with string concatenation
// const query = `SELECT * FROM users WHERE id = '${userId}'`; // BAD!
// DO: Implement rate limiting
@Throttle(10, 60) // 10 requests per minute
async publicEndpoint(): Promise<any> {
// Rate limited endpoint
}
// DO: Encrypt sensitive data
class SecureStorage {
async set(key: string, value: any): Promise<void> {
const encrypted = await this.encrypt(JSON.stringify(value));
await this.storage.set(key, encrypted);
}
async get(key: string): Promise<any> {
const encrypted = await this.storage.get(key);
if (!encrypted) return null;
const decrypted = await this.decrypt(encrypted);
return JSON.parse(decrypted);
}
}
// DO: Add comprehensive logging
class ObservableNode extends BaseNode {
async executeAsync(prepared: any, context: WorkflowState): Promise<any> {
const startTime = Date.now();
const traceId = context.executionId;
this.logger.info('Node execution started', {
nodeType: this.type,
traceId,
params: this.sanitizeParams(prepared),
});
try {
const result = await this.doWork(prepared);
this.metrics.recordSuccess(this.type, Date.now() - startTime);
this.logger.info('Node execution completed', {
nodeType: this.type,
traceId,
duration: Date.now() - startTime,
});
return result;
} catch (error) {
this.metrics.recordFailure(this.type, error);
this.logger.error('Node execution failed', {
nodeType: this.type,
traceId,
error: error.message,
stack: error.stack,
duration: Date.now() - startTime,
});
throw error;
}
}
}

This completes the comprehensive Developer Guide for the Workflow Engine v3. The guide covers everything from architecture and setup through advanced topics like distributed processing and Python integration, providing developers with all the information needed to extend, implement, and integrate the workflow engine into their applications.