🚀 Complex Multi-Step Workflows
🎯 Mục tiêu bài học
Sau bài học này, bạn sẽ:
✅ Master error handling và recovery patterns
✅ Xây dựng sub-workflows và modularization
✅ Implement parallel processing và batch operations
✅ Thiết kế state machines, queues, và event-driven architecture
Combine everything you've learned into powerful, production-ready automations. Bài này covers advanced patterns cho complex, reliable workflows.
🏗️ Advanced Workflow Architecture
Workflow Complexity Levels:
Checkpoint
Có mấy levels of workflow complexity?
⚡ Pattern 1: Error Handling & Recovery
Robust Error Management:
| Stage | Component | Details |
|---|---|---|
| 1. Trigger | Webhook / Schedule | Khởi động workflow |
| 2. TRY Block | Main Logic | Step 1 → Step 2 → Step 3 |
| 3a. Success ✅ | Continue | Tiếp tục xử lý bình thường |
| 3b. Error ❌ | Error Handler | Log Error → Retry Logic → Fallback Action → Alert Team |
Flow: Trigger → TRY (Main Logic) → Success: Continue / Error: Error Handler (Log → Retry → Fallback → Alert)
Error Handling Code:
1// Wrap operations in try-catch2async function safeExecute(operation, context) {3 const maxRetries = 3;4 const retryDelay = 1000; // ms5 6 for (let attempt = 1; attempt <= maxRetries; attempt++) {7 try {8 const result = await operation();9 return { success: true, result, attempt };10 } catch (error) {11 console.log(`Attempt ${attempt} failed: ${error.message}`);12 13 // Check if retryable14 const retryable = isRetryableError(error);15 16 if (retryable && attempt < maxRetries) {17 // Exponential backoff18 const delay = retryDelay * Math.pow(2, attempt - 1);19 await sleep(delay);20 continue;21 }22 23 // Final failure24 return {25 success: false,26 error: {27 message: error.message,28 code: error.code,29 attempt,30 context,31 timestamp: new Date().toISOString()32 }33 };34 }35 }36}3738function isRetryableError(error) {39 const retryableCodes = [429, 500, 502, 503, 504]; // Rate limit, server errors40 const retryableMessages = ['ECONNRESET', 'ETIMEDOUT', 'socket hang up'];41 42 if (retryableCodes.includes(error.statusCode)) return true;43 if (retryableMessages.some(m => error.message?.includes(m))) return true;44 45 return false;46}4748function sleep(ms) {49 return new Promise(resolve => setTimeout(resolve, ms));50}5152// Usage53const result = await safeExecute(54 () => callExternalAPI(data),55 { operation: 'api_call', itemId: item.id }56);5758return result;Error Notification:
1const errors = $input.all().filter(item => !item.json.success);23if (errors.length === 0) {4 return []; // No errors to report5}67// Group errors by type8const groupedErrors = errors.reduce((acc, err) => {9 const type = err.json.error?.code || 'unknown';10 if (!acc[type]) acc[type] = [];11 acc[type].push(err.json);12 return acc;13}, {});1415// Format error report16const report = {17 workflow: 'Data Processing Pipeline',18 timestamp: new Date().toISOString(),19 totalErrors: errors.length,20 errorTypes: Object.entries(groupedErrors).map(([type, errs]) => ({21 type,22 count: errs.length,23 samples: errs.slice(0, 3).map(e => ({24 message: e.error?.message,25 context: e.error?.context26 }))27 })),28 29 // Slack format30 slackMessage: {31 text: `⚠️ Workflow Errors: ${errors.length} failures`,32 blocks: [33 {34 type: 'header',35 text: { type: 'plain_text', text: '⚠️ Workflow Error Report' }36 },37 {38 type: 'section',39 text: {40 type: 'mrkdwn',41 text: `*Workflow:* Data Processing Pipeline\n*Time:* ${new Date().toLocaleString()}\n*Total Errors:* ${errors.length}`42 }43 },44 ...Object.entries(groupedErrors).map(([type, errs]) => ({45 type: 'section',46 text: {47 type: 'mrkdwn',48 text: `*${type}:* ${errs.length} error(s)\n\`\`\`${errs[0]?.error?.message}\`\`\``49 }50 }))51 ]52 }53};5455return { report };Checkpoint
Error handling sử dụng exponential backoff như thế nào?
🔧 Pattern 2: Sub-Workflows & Modularization
Breaking Down Complex Workflows:
Main Workflow: Order Processing
Sub-Workflow Caller:
1// Main workflow calls sub-workflow via webhook2const order = $input.item.json;34// Prepare sub-workflow calls5const subWorkflows = {6 validation: {7 url: $env.VALIDATION_WORKFLOW_URL,8 data: { orderId: order.id, items: order.items }9 },10 payment: {11 url: $env.PAYMENT_WORKFLOW_URL,12 data: { 13 orderId: order.id, 14 amount: order.total, 15 paymentMethod: order.paymentMethod 16 }17 },18 fulfillment: {19 url: $env.FULFILLMENT_WORKFLOW_URL,20 data: { 21 orderId: order.id, 22 items: order.items,23 shippingAddress: order.shippingAddress 24 }25 },26 notification: {27 url: $env.NOTIFICATION_WORKFLOW_URL,28 data: { 29 orderId: order.id, 30 customer: order.customer,31 type: 'order_confirmation' 32 }33 }34};3536// Execute validation first (blocking)37const validationResult = await callSubWorkflow(subWorkflows.validation);3839if (!validationResult.success) {40 return {41 success: false,42 stage: 'validation',43 error: validationResult.error44 };45}4647// Continue with payment48const paymentResult = await callSubWorkflow(subWorkflows.payment);4950if (!paymentResult.success) {51 // Trigger compensation (rollback)52 await callSubWorkflow({53 url: $env.ROLLBACK_WORKFLOW_URL,54 data: { orderId: order.id, stage: 'payment' }55 });56 57 return {58 success: false,59 stage: 'payment',60 error: paymentResult.error61 };62}6364// Fulfillment and notification can run in parallel65return {66 orderId: order.id,67 validationResult,68 paymentResult,69 pendingWorkflows: ['fulfillment', 'notification']70};7172async function callSubWorkflow({ url, data }) {73 try {74 const response = await fetch(url, {75 method: 'POST',76 headers: { 'Content-Type': 'application/json' },77 body: JSON.stringify(data)78 });79 return await response.json();80 } catch (error) {81 return { success: false, error: error.message };82 }83}Checkpoint
Sub-workflows giúp modularize complex workflows bằng cách nào?
🏗️ Pattern 3: Parallel Processing
Process Items in Parallel:
Batch Processor
Batch Processing Logic:
1const allItems = $input.item.json.items;2const batchSize = 50;3const maxParallel = 5;45// Split into batches6function createBatches(items, size) {7 const batches = [];8 for (let i = 0; i < items.length; i += size) {9 batches.push({10 batchId: Math.floor(i / size) + 1,11 items: items.slice(i, i + size),12 startIndex: i,13 endIndex: Math.min(i + size, items.length)14 });15 }16 return batches;17}1819// Process with concurrency limit20async function processWithLimit(batches, processor, limit) {21 const results = [];22 const executing = [];23 24 for (const batch of batches) {25 const promise = processor(batch).then(result => {26 executing.splice(executing.indexOf(promise), 1);27 return result;28 });29 30 results.push(promise);31 executing.push(promise);32 33 if (executing.length >= limit) {34 await Promise.race(executing);35 }36 }37 38 return Promise.all(results);39}4041// Example processor42async function processBatch(batch) {43 const results = {44 batchId: batch.batchId,45 processed: 0,46 errors: []47 };48 49 for (const item of batch.items) {50 try {51 await processItem(item);52 results.processed++;53 } catch (error) {54 results.errors.push({ itemId: item.id, error: error.message });55 }56 }57 58 return results;59}6061const batches = createBatches(allItems, batchSize);62const batchResults = await processWithLimit(batches, processBatch, maxParallel);6364// Aggregate results65const summary = {66 totalItems: allItems.length,67 totalBatches: batches.length,68 processedItems: batchResults.reduce((sum, b) => sum + b.processed, 0),69 totalErrors: batchResults.reduce((sum, b) => sum + b.errors.length, 0),70 errorDetails: batchResults.flatMap(b => b.errors)71};7273return { summary };Checkpoint
Parallel processing chia items thành batches với size bao nhiêu?
📊 Pattern 4: State Machine Workflow
Stateful Order Processing:
| Current State | Event | Next State |
|---|---|---|
| PENDING | pay | PAID |
| PENDING | cancel | CANCELLED |
| PAID | ship | SHIPPED |
| PAID | refund | REFUNDED |
| SHIPPED | deliver | DELIVERED |
| DELIVERED | return | RETURNED |
| RETURNED | (re-enter) | PENDING |
Lifecycle: PENDING → PAID → SHIPPED → DELIVERED → (optional) RETURNED → PENDING
State Machine Implementation:
1const order = $input.item.json;2const event = $input.item.json.event;34// State machine definition5const stateMachine = {6 PENDING: {7 pay: 'PAID',8 cancel: 'CANCELLED'9 },10 PAID: {11 ship: 'SHIPPED',12 refund: 'REFUNDED'13 },14 SHIPPED: {15 deliver: 'DELIVERED',16 return_initiated: 'RETURN_PENDING'17 },18 DELIVERED: {19 return_initiated: 'RETURN_PENDING'20 },21 RETURN_PENDING: {22 return_received: 'RETURNED',23 return_cancelled: 'DELIVERED'24 },25 RETURNED: {26 refund: 'REFUNDED'27 },28 REFUNDED: {},29 CANCELLED: {}30};3132// Transition actions33const transitionActions = {34 'PENDING_TO_PAID': async (order) => {35 await capturePayment(order.paymentId);36 await sendEmail(order.customer.email, 'payment_confirmed');37 await updateInventory(order.items, 'reserve');38 },39 'PAID_TO_SHIPPED': async (order) => {40 const tracking = await createShipment(order);41 await sendEmail(order.customer.email, 'shipped', { tracking });42 },43 'SHIPPED_TO_DELIVERED': async (order) => {44 await sendEmail(order.customer.email, 'delivered');45 await triggerReviewRequest(order, 7); // days delay46 },47 'PAID_TO_REFUNDED': async (order) => {48 await processRefund(order.paymentId, order.total);49 await updateInventory(order.items, 'release');50 await sendEmail(order.customer.email, 'refunded');51 }52};5354// Execute transition55function transition(currentState, event) {56 const nextState = stateMachine[currentState]?.[event];57 58 if (!nextState) {59 throw new Error(`Invalid transition: ${currentState} + ${event}`);60 }61 62 return nextState;63}6465// Process state change66const currentState = order.status;67const newState = transition(currentState, event);68const actionKey = `${currentState}_TO_${newState}`;6970// Execute transition action71if (transitionActions[actionKey]) {72 await transitionActions[actionKey](order);73}7475return {76 orderId: order.id,77 previousState: currentState,78 event,79 newState,80 transitionedAt: new Date().toISOString()81};Checkpoint
State machine pattern cho order lifecycle có mấy states?
💡 Pattern 5: Queue-Based Processing
Reliable Queue Processing:
Queue Consumer
Queue Processing Code:
1const message = $input.item.json;2const maxRetries = 3;34// Track processing state5const processingState = {6 messageId: message.id,7 startedAt: new Date().toISOString(),8 attempts: message.retryCount || 09};1011try {12 // Process the message13 const result = await processMessage(message.body);14 15 return {16 success: true,17 action: 'delete',18 messageId: message.id,19 result,20 processingTime: Date.now() - new Date(processingState.startedAt).getTime()21 };22 23} catch (error) {24 processingState.attempts++;25 26 // Determine action based on retry count27 if (processingState.attempts < maxRetries) {28 // Keep in queue with exponential backoff29 const visibilityTimeout = Math.pow(2, processingState.attempts) * 60; // seconds30 31 return {32 success: false,33 action: 'retry',34 messageId: message.id,35 retryCount: processingState.attempts,36 visibilityTimeout,37 error: error.message38 };39 } else {40 // Move to Dead Letter Queue41 return {42 success: false,43 action: 'dlq',44 messageId: message.id,45 finalError: error.message,46 attempts: processingState.attempts,47 alertRequired: true,48 dlqPayload: {49 originalMessage: message,50 error: error.message,51 stack: error.stack,52 failedAt: new Date().toISOString()53 }54 };55 }56}5758async function processMessage(body) {59 // Your processing logic here60 const data = JSON.parse(body);61 62 switch (data.type) {63 case 'send_email':64 return await sendEmail(data);65 case 'process_payment':66 return await processPayment(data);67 case 'sync_data':68 return await syncData(data);69 default:70 throw new Error(`Unknown message type: ${data.type}`);71 }72}Checkpoint
Queue-based processing move messages tới DLQ sau bao nhiêu retries?
🏗️ Pattern 6: Event-Driven Architecture
Event Publishing & Subscribing:
| Publishers | Events | Event Bus | Subscribers |
|---|---|---|---|
| Order Service | order.created, order.paid | → Event Hub → | Notification Service |
| User Service | user.registered, user.updated | → Event Hub → | Analytics Service |
| Payment Service | payment.success, payment.failed | → Event Hub → | Inventory Service |
| Audit Log Service | |||
| Alert Service |
Flow: Publishers emit events → Event Hub routes → Subscribers process
Event Handler:
1const event = $input.item.json;23// Event routing table4const eventHandlers = {5 'order.created': [6 'inventory.reserve',7 'notification.order_confirmation',8 'analytics.track_order'9 ],10 'order.paid': [11 'fulfillment.process',12 'notification.payment_confirmed',13 'accounting.record_revenue'14 ],15 'order.shipped': [16 'notification.shipping_update',17 'analytics.track_shipment'18 ],19 'order.delivered': [20 'notification.delivery_confirmation',21 'feedback.request_review',22 'loyalty.award_points'23 ],24 'payment.failed': [25 'notification.payment_failed',26 'order.cancel',27 'analytics.track_failure'28 ]29};3031// Get handlers for this event32const handlers = eventHandlers[event.type] || [];3334if (handlers.length === 0) {35 console.log(`No handlers for event: ${event.type}`);36 return { handled: false, event: event.type };37}3839// Fan out to handlers40const dispatchResults = handlers.map(handler => ({41 handler,42 event: {43 id: event.id,44 type: event.type,45 data: event.data,46 metadata: {47 timestamp: event.timestamp,48 source: event.source,49 correlationId: event.correlationId || event.id50 }51 },52 webhookUrl: getWebhookUrl(handler)53}));5455function getWebhookUrl(handler) {56 const [service, action] = handler.split('.');57 return `${$env.SERVICE_BASE_URL}/${service}/webhook/${action}`;58}5960return dispatchResults;Checkpoint
Event-driven architecture sử dụng event routing table với format nào?
📈 Pattern 7: Complete Example - E-commerce Order Pipeline
Full Order Processing:
E-commerce Order Pipeline
Main Orchestrator Code:
1const order = $input.item.json;2const pipeline = {3 orderId: order.id,4 startedAt: new Date().toISOString(),5 stages: [],6 currentStage: null,7 status: 'processing'8};910// Stage 1: Validation11pipeline.currentStage = 'validation';12const validation = await executeStage('validation', {13 validateOrderData: () => validateOrder(order),14 checkInventory: () => checkInventoryAvailability(order.items),15 verifyCustomer: () => verifyCustomerAccount(order.customerId),16 calculateTotals: () => calculateOrderTotals(order)17});1819if (!validation.success) {20 return finalizePipeline(pipeline, 'failed', validation.errors);21}2223pipeline.stages.push({ stage: 'validation', result: validation });2425// Stage 2: Payment26pipeline.currentStage = 'payment';27const payment = await executeWithRetry(28 () => processPayment(order.paymentMethod, validation.totals),29 { maxRetries: 3, backoff: 'exponential' }30);3132if (!payment.success) {33 await rollback(pipeline, 'payment');34 return finalizePipeline(pipeline, 'payment_failed', payment.error);35}3637pipeline.stages.push({ stage: 'payment', result: payment });3839// Stage 3: Fulfillment40pipeline.currentStage = 'fulfillment';41const fulfillment = order.isDigital 42 ? await deliverDigitalProduct(order)43 : await createPhysicalShipment(order);4445pipeline.stages.push({ stage: 'fulfillment', result: fulfillment });4647// Stage 4: Post-order (parallel)48pipeline.currentStage = 'post_order';49const postOrderTasks = await Promise.allSettled([50 sendConfirmationEmail(order, fulfillment),51 updateInventoryRecords(order.items),52 recordOrderAnalytics(order, pipeline),53 updateCRMContact(order.customerId, order),54 awardLoyaltyPoints(order.customerId, order.total)55]);5657pipeline.stages.push({ 58 stage: 'post_order', 59 results: postOrderTasks.map((t, i) => ({60 task: ['email', 'inventory', 'analytics', 'crm', 'loyalty'][i],61 status: t.status,62 result: t.status === 'fulfilled' ? t.value : t.reason63 }))64});6566return finalizePipeline(pipeline, 'completed');6768// Helper functions69async function executeStage(name, tasks) {70 const results = {};71 for (const [taskName, taskFn] of Object.entries(tasks)) {72 try {73 results[taskName] = await taskFn();74 } catch (error) {75 return { success: false, errors: [{ task: taskName, error: error.message }] };76 }77 }78 return { success: true, ...results };79}8081function finalizePipeline(pipeline, status, error = null) {82 return {83 ...pipeline,84 status,85 error,86 completedAt: new Date().toISOString(),87 duration: Date.now() - new Date(pipeline.startedAt).getTime()88 };89}Checkpoint
E-commerce order pipeline bao gồm mấy stages chính?
📊 Monitoring & Observability
Key Metrics:
- Execution time per workflow
- Success/failure rates
- Queue depths
- Error frequency by type
- Throughput (executions/hour)
Alerting Rules:
- Error rate > 5% → Warning
- Error rate > 10% → Critical
- Queue depth > 1000 → Warning
- Avg execution > 30s → Investigate
Checkpoint
Key monitoring metrics bao gồm những gì?
📋 Best Practices Summary
Architecture:
- ✅ Break into sub-workflows
- ✅ Use queues for reliability
- ✅ Implement idempotency
- ✅ Log extensively
- ✅ Monitor everything
Error Handling:
- ✅ Always have error branches
- ✅ Implement retries with backoff
- ✅ Use Dead Letter Queues
- ✅ Alert on persistent failures
Performance:
- ✅ Process in batches
- ✅ Use parallel where possible
- ✅ Set appropriate timeouts
- ✅ Cache frequently used data
Checkpoint
Tại sao cần implement idempotency cho production workflows?
📝 Bài Tập Thực Hành
Build a complete pipeline:
- Design multi-stage workflow with state machine
- Implement error handling with retries & DLQ
- Add parallel processing for performance
- Create sub-workflows for modularity
- Set up monitoring & alerting
You're now an n8n expert! 🎓
Checkpoint
Bạn đã hoàn thành final challenge chưa?
🌟 Course Summary
Module 1: Personal Productivity
- Email, Calendar, Notes automation
- Personal dashboards
Module 2: Team Workflows
- Notifications, Documents, Meetings
- Onboarding processes
Module 3: Content & Social
- Content curation & repurposing
- Social media automation
- Lead generation
Module 4: Advanced Patterns
- Error handling & recovery
- Sub-workflows & modularization
- Parallel processing
- State machines & queues
- Event-driven architecture
You're ready to automate anything! 🚀
Checkpoint
Khóa học n8n Productivity bao gồm mấy module chính?
🚀 Next Steps
- 📚 Explore n8n community templates
- 🔗 Join n8n Discord/Forum
- 🛠️ Build real projects for your team
- 📖 Check out n8n Advanced Agents course
- 🎯 Practice, iterate, improve!
Checkpoint
Bạn dự định áp dụng n8n vào project nào tiếp theo?
Chúc mừng bạn đã hoàn thành khóa học n8n Productivity! 🎉
Bạn đã master:
- ⚡ Email, Calendar, Notes automation
- 📊 Personal dashboards và team workflows
- 📱 Content curation và social media automation
- 🏗️ Advanced patterns: error handling, sub-workflows, parallel processing
Hãy áp dụng những gì đã học và tiếp tục explore n8n community!
