MinAI - Về trang chủ
Dự án
12/1350 phút
Đang tải...

Complex Multi-Step Workflows

Master advanced n8n patterns - Error handling, sub-workflows, parallel processing, và real-world complex automation

🚀 Complex Multi-Step Workflows

Complex Workflows

0

🎯 Mục tiêu bài học

TB5 min

Sau bài học này, bạn sẽ:

✅ Master error handling và recovery patterns

✅ Xây dựng sub-workflows và modularization

✅ Implement parallel processing và batch operations

✅ Thiết kế state machines, queues, và event-driven architecture

Combine everything you've learned into powerful, production-ready automations. Bài này covers advanced patterns cho complex, reliable workflows.

1

🏗️ Advanced Workflow Architecture

TB5 min

Workflow Complexity Levels:

📊Workflow Complexity Progression
1️⃣Level 1: LINEAR
➡️Trigger → Action → Action → End
2️⃣Level 2: BRANCHING
Yes → Actions
No → Other Actions
3️⃣Level 3: PARALLEL
🅰️Branch A
🅱️Branch B → Merge → Continue
🅲Branch C
4️⃣Level 4: SUB-WORKFLOWS
🔗Main → Sub-workflow 1 → Sub-workflow 2 → End
5️⃣Level 5: ORCHESTRATED
👷Worker Pool
🔄Error Recovery
📋State Management
📡Monitoring

Checkpoint

Có mấy levels of workflow complexity?

2

⚡ Pattern 1: Error Handling & Recovery

TB5 min

Robust Error Management:

Workflow: Error-Resilient Processing
StageComponentDetails
1. TriggerWebhook / ScheduleKhởi động workflow
2. TRY BlockMain LogicStep 1 → Step 2 → Step 3
3a. SuccessContinueTiếp tục xử lý bình thường
3b. ErrorError HandlerLog Error → Retry Logic → Fallback Action → Alert Team

Flow: Trigger → TRY (Main Logic) → Success: Continue / Error: Error Handler (Log → Retry → Fallback → Alert)

Error Handling Code:

JavaScript
1// Wrap operations in try-catch
2async function safeExecute(operation, context) {
3 const maxRetries = 3;
4 const retryDelay = 1000; // ms
5
6 for (let attempt = 1; attempt <= maxRetries; attempt++) {
7 try {
8 const result = await operation();
9 return { success: true, result, attempt };
10 } catch (error) {
11 console.log(`Attempt ${attempt} failed: ${error.message}`);
12
13 // Check if retryable
14 const retryable = isRetryableError(error);
15
16 if (retryable && attempt < maxRetries) {
17 // Exponential backoff
18 const delay = retryDelay * Math.pow(2, attempt - 1);
19 await sleep(delay);
20 continue;
21 }
22
23 // Final failure
24 return {
25 success: false,
26 error: {
27 message: error.message,
28 code: error.code,
29 attempt,
30 context,
31 timestamp: new Date().toISOString()
32 }
33 };
34 }
35 }
36}
37
38function isRetryableError(error) {
39 const retryableCodes = [429, 500, 502, 503, 504]; // Rate limit, server errors
40 const retryableMessages = ['ECONNRESET', 'ETIMEDOUT', 'socket hang up'];
41
42 if (retryableCodes.includes(error.statusCode)) return true;
43 if (retryableMessages.some(m => error.message?.includes(m))) return true;
44
45 return false;
46}
47
48function sleep(ms) {
49 return new Promise(resolve => setTimeout(resolve, ms));
50}
51
52// Usage
53const result = await safeExecute(
54 () => callExternalAPI(data),
55 { operation: 'api_call', itemId: item.id }
56);
57
58return result;

Error Notification:

JavaScript
1const errors = $input.all().filter(item => !item.json.success);
2
3if (errors.length === 0) {
4 return []; // No errors to report
5}
6
7// Group errors by type
8const groupedErrors = errors.reduce((acc, err) => {
9 const type = err.json.error?.code || 'unknown';
10 if (!acc[type]) acc[type] = [];
11 acc[type].push(err.json);
12 return acc;
13}, {});
14
15// Format error report
16const report = {
17 workflow: 'Data Processing Pipeline',
18 timestamp: new Date().toISOString(),
19 totalErrors: errors.length,
20 errorTypes: Object.entries(groupedErrors).map(([type, errs]) => ({
21 type,
22 count: errs.length,
23 samples: errs.slice(0, 3).map(e => ({
24 message: e.error?.message,
25 context: e.error?.context
26 }))
27 })),
28
29 // Slack format
30 slackMessage: {
31 text: `⚠️ Workflow Errors: ${errors.length} failures`,
32 blocks: [
33 {
34 type: 'header',
35 text: { type: 'plain_text', text: '⚠️ Workflow Error Report' }
36 },
37 {
38 type: 'section',
39 text: {
40 type: 'mrkdwn',
41 text: `*Workflow:* Data Processing Pipeline\n*Time:* ${new Date().toLocaleString()}\n*Total Errors:* ${errors.length}`
42 }
43 },
44 ...Object.entries(groupedErrors).map(([type, errs]) => ({
45 type: 'section',
46 text: {
47 type: 'mrkdwn',
48 text: `*${type}:* ${errs.length} error(s)\n\`\`\`${errs[0]?.error?.message}\`\`\``
49 }
50 }))
51 ]
52 }
53};
54
55return { report };

Checkpoint

Error handling sử dụng exponential backoff như thế nào?

3

🔧 Pattern 2: Sub-Workflows & Modularization

TB5 min

Breaking Down Complex Workflows:

Main Workflow: Order Processing

🔔New Order Webhook
Validate Order → Sub: Order Validation
💳Process Payment → Sub: Payment Processing
📦Fulfill Order → Sub: Fulfillment
💻Digital → Sub: Digital Delivery
🚚Physical → Sub: Shipping
📧Send Notifications → Sub: Notification Service
📊Update Analytics → Sub: Analytics Tracker

Sub-Workflow Caller:

JavaScript
1// Main workflow calls sub-workflow via webhook
2const order = $input.item.json;
3
4// Prepare sub-workflow calls
5const subWorkflows = {
6 validation: {
7 url: $env.VALIDATION_WORKFLOW_URL,
8 data: { orderId: order.id, items: order.items }
9 },
10 payment: {
11 url: $env.PAYMENT_WORKFLOW_URL,
12 data: {
13 orderId: order.id,
14 amount: order.total,
15 paymentMethod: order.paymentMethod
16 }
17 },
18 fulfillment: {
19 url: $env.FULFILLMENT_WORKFLOW_URL,
20 data: {
21 orderId: order.id,
22 items: order.items,
23 shippingAddress: order.shippingAddress
24 }
25 },
26 notification: {
27 url: $env.NOTIFICATION_WORKFLOW_URL,
28 data: {
29 orderId: order.id,
30 customer: order.customer,
31 type: 'order_confirmation'
32 }
33 }
34};
35
36// Execute validation first (blocking)
37const validationResult = await callSubWorkflow(subWorkflows.validation);
38
39if (!validationResult.success) {
40 return {
41 success: false,
42 stage: 'validation',
43 error: validationResult.error
44 };
45}
46
47// Continue with payment
48const paymentResult = await callSubWorkflow(subWorkflows.payment);
49
50if (!paymentResult.success) {
51 // Trigger compensation (rollback)
52 await callSubWorkflow({
53 url: $env.ROLLBACK_WORKFLOW_URL,
54 data: { orderId: order.id, stage: 'payment' }
55 });
56
57 return {
58 success: false,
59 stage: 'payment',
60 error: paymentResult.error
61 };
62}
63
64// Fulfillment and notification can run in parallel
65return {
66 orderId: order.id,
67 validationResult,
68 paymentResult,
69 pendingWorkflows: ['fulfillment', 'notification']
70};
71
72async function callSubWorkflow({ url, data }) {
73 try {
74 const response = await fetch(url, {
75 method: 'POST',
76 headers: { 'Content-Type': 'application/json' },
77 body: JSON.stringify(data)
78 });
79 return await response.json();
80 } catch (error) {
81 return { success: false, error: error.message };
82 }
83}

Checkpoint

Sub-workflows giúp modularize complex workflows bằng cách nào?

4

🏗️ Pattern 3: Parallel Processing

TB5 min

Process Items in Parallel:

Batch Processor

📦Trigger: Large Dataset
✂️Split into Batches
⚙️Batch 1
⚙️Batch 2
⚙️Batch 3
⚙️Batch 4
🔀Merge Results
📊Generate Report

Batch Processing Logic:

JavaScript
1const allItems = $input.item.json.items;
2const batchSize = 50;
3const maxParallel = 5;
4
5// Split into batches
6function createBatches(items, size) {
7 const batches = [];
8 for (let i = 0; i < items.length; i += size) {
9 batches.push({
10 batchId: Math.floor(i / size) + 1,
11 items: items.slice(i, i + size),
12 startIndex: i,
13 endIndex: Math.min(i + size, items.length)
14 });
15 }
16 return batches;
17}
18
19// Process with concurrency limit
20async function processWithLimit(batches, processor, limit) {
21 const results = [];
22 const executing = [];
23
24 for (const batch of batches) {
25 const promise = processor(batch).then(result => {
26 executing.splice(executing.indexOf(promise), 1);
27 return result;
28 });
29
30 results.push(promise);
31 executing.push(promise);
32
33 if (executing.length >= limit) {
34 await Promise.race(executing);
35 }
36 }
37
38 return Promise.all(results);
39}
40
41// Example processor
42async function processBatch(batch) {
43 const results = {
44 batchId: batch.batchId,
45 processed: 0,
46 errors: []
47 };
48
49 for (const item of batch.items) {
50 try {
51 await processItem(item);
52 results.processed++;
53 } catch (error) {
54 results.errors.push({ itemId: item.id, error: error.message });
55 }
56 }
57
58 return results;
59}
60
61const batches = createBatches(allItems, batchSize);
62const batchResults = await processWithLimit(batches, processBatch, maxParallel);
63
64// Aggregate results
65const summary = {
66 totalItems: allItems.length,
67 totalBatches: batches.length,
68 processedItems: batchResults.reduce((sum, b) => sum + b.processed, 0),
69 totalErrors: batchResults.reduce((sum, b) => sum + b.errors.length, 0),
70 errorDetails: batchResults.flatMap(b => b.errors)
71};
72
73return { summary };

Checkpoint

Parallel processing chia items thành batches với size bao nhiêu?

5

📊 Pattern 4: State Machine Workflow

TB5 min

Stateful Order Processing:

State Machine: Order Lifecycle
Current StateEventNext State
PENDINGpayPAID
PENDINGcancelCANCELLED
PAIDshipSHIPPED
PAIDrefundREFUNDED
SHIPPEDdeliverDELIVERED
DELIVEREDreturnRETURNED
RETURNED(re-enter)PENDING

Lifecycle: PENDING → PAID → SHIPPED → DELIVERED → (optional) RETURNED → PENDING

State Machine Implementation:

JavaScript
1const order = $input.item.json;
2const event = $input.item.json.event;
3
4// State machine definition
5const stateMachine = {
6 PENDING: {
7 pay: 'PAID',
8 cancel: 'CANCELLED'
9 },
10 PAID: {
11 ship: 'SHIPPED',
12 refund: 'REFUNDED'
13 },
14 SHIPPED: {
15 deliver: 'DELIVERED',
16 return_initiated: 'RETURN_PENDING'
17 },
18 DELIVERED: {
19 return_initiated: 'RETURN_PENDING'
20 },
21 RETURN_PENDING: {
22 return_received: 'RETURNED',
23 return_cancelled: 'DELIVERED'
24 },
25 RETURNED: {
26 refund: 'REFUNDED'
27 },
28 REFUNDED: {},
29 CANCELLED: {}
30};
31
32// Transition actions
33const transitionActions = {
34 'PENDING_TO_PAID': async (order) => {
35 await capturePayment(order.paymentId);
36 await sendEmail(order.customer.email, 'payment_confirmed');
37 await updateInventory(order.items, 'reserve');
38 },
39 'PAID_TO_SHIPPED': async (order) => {
40 const tracking = await createShipment(order);
41 await sendEmail(order.customer.email, 'shipped', { tracking });
42 },
43 'SHIPPED_TO_DELIVERED': async (order) => {
44 await sendEmail(order.customer.email, 'delivered');
45 await triggerReviewRequest(order, 7); // days delay
46 },
47 'PAID_TO_REFUNDED': async (order) => {
48 await processRefund(order.paymentId, order.total);
49 await updateInventory(order.items, 'release');
50 await sendEmail(order.customer.email, 'refunded');
51 }
52};
53
54// Execute transition
55function transition(currentState, event) {
56 const nextState = stateMachine[currentState]?.[event];
57
58 if (!nextState) {
59 throw new Error(`Invalid transition: ${currentState} + ${event}`);
60 }
61
62 return nextState;
63}
64
65// Process state change
66const currentState = order.status;
67const newState = transition(currentState, event);
68const actionKey = `${currentState}_TO_${newState}`;
69
70// Execute transition action
71if (transitionActions[actionKey]) {
72 await transitionActions[actionKey](order);
73}
74
75return {
76 orderId: order.id,
77 previousState: currentState,
78 event,
79 newState,
80 transitionedAt: new Date().toISOString()
81};

Checkpoint

State machine pattern cho order lifecycle có mấy states?

6

💡 Pattern 5: Queue-Based Processing

TB5 min

Reliable Queue Processing:

Queue Consumer

🔄Poll Queue Every 30s
📨Get Messages (batch of 10)
⚙️Process Each Message
Delete from Queue & Update
🔁Increment Retry Count
Wait & Repeat
Keep in Queue (backoff)
🚨Move to DLQ & Alert Team

Queue Processing Code:

JavaScript
1const message = $input.item.json;
2const maxRetries = 3;
3
4// Track processing state
5const processingState = {
6 messageId: message.id,
7 startedAt: new Date().toISOString(),
8 attempts: message.retryCount || 0
9};
10
11try {
12 // Process the message
13 const result = await processMessage(message.body);
14
15 return {
16 success: true,
17 action: 'delete',
18 messageId: message.id,
19 result,
20 processingTime: Date.now() - new Date(processingState.startedAt).getTime()
21 };
22
23} catch (error) {
24 processingState.attempts++;
25
26 // Determine action based on retry count
27 if (processingState.attempts < maxRetries) {
28 // Keep in queue with exponential backoff
29 const visibilityTimeout = Math.pow(2, processingState.attempts) * 60; // seconds
30
31 return {
32 success: false,
33 action: 'retry',
34 messageId: message.id,
35 retryCount: processingState.attempts,
36 visibilityTimeout,
37 error: error.message
38 };
39 } else {
40 // Move to Dead Letter Queue
41 return {
42 success: false,
43 action: 'dlq',
44 messageId: message.id,
45 finalError: error.message,
46 attempts: processingState.attempts,
47 alertRequired: true,
48 dlqPayload: {
49 originalMessage: message,
50 error: error.message,
51 stack: error.stack,
52 failedAt: new Date().toISOString()
53 }
54 };
55 }
56}
57
58async function processMessage(body) {
59 // Your processing logic here
60 const data = JSON.parse(body);
61
62 switch (data.type) {
63 case 'send_email':
64 return await sendEmail(data);
65 case 'process_payment':
66 return await processPayment(data);
67 case 'sync_data':
68 return await syncData(data);
69 default:
70 throw new Error(`Unknown message type: ${data.type}`);
71 }
72}

Checkpoint

Queue-based processing move messages tới DLQ sau bao nhiêu retries?

7

🏗️ Pattern 6: Event-Driven Architecture

TB5 min

Event Publishing & Subscribing:

Event-Driven System Architecture
PublishersEventsEvent BusSubscribers
Order Serviceorder.created, order.paid→ Event Hub →Notification Service
User Serviceuser.registered, user.updated→ Event Hub →Analytics Service
Payment Servicepayment.success, payment.failed→ Event Hub →Inventory Service
Audit Log Service
Alert Service

Flow: Publishers emit events → Event Hub routes → Subscribers process

Event Handler:

JavaScript
1const event = $input.item.json;
2
3// Event routing table
4const eventHandlers = {
5 'order.created': [
6 'inventory.reserve',
7 'notification.order_confirmation',
8 'analytics.track_order'
9 ],
10 'order.paid': [
11 'fulfillment.process',
12 'notification.payment_confirmed',
13 'accounting.record_revenue'
14 ],
15 'order.shipped': [
16 'notification.shipping_update',
17 'analytics.track_shipment'
18 ],
19 'order.delivered': [
20 'notification.delivery_confirmation',
21 'feedback.request_review',
22 'loyalty.award_points'
23 ],
24 'payment.failed': [
25 'notification.payment_failed',
26 'order.cancel',
27 'analytics.track_failure'
28 ]
29};
30
31// Get handlers for this event
32const handlers = eventHandlers[event.type] || [];
33
34if (handlers.length === 0) {
35 console.log(`No handlers for event: ${event.type}`);
36 return { handled: false, event: event.type };
37}
38
39// Fan out to handlers
40const dispatchResults = handlers.map(handler => ({
41 handler,
42 event: {
43 id: event.id,
44 type: event.type,
45 data: event.data,
46 metadata: {
47 timestamp: event.timestamp,
48 source: event.source,
49 correlationId: event.correlationId || event.id
50 }
51 },
52 webhookUrl: getWebhookUrl(handler)
53}));
54
55function getWebhookUrl(handler) {
56 const [service, action] = handler.split('.');
57 return `${$env.SERVICE_BASE_URL}/${service}/webhook/${action}`;
58}
59
60return dispatchResults;

Checkpoint

Event-driven architecture sử dụng event routing table với format nào?

8

📈 Pattern 7: Complete Example - E-commerce Order Pipeline

TB5 min

Full Order Processing:

E-commerce Order Pipeline

🛒Webhook: New Order
✔️Validation Phase
Reject Order & Notify
💳Payment Phase
🔁Retry x3 & Alert
📧Fulfillment: Digital → Instant
📦Fulfillment: Physical → Ship
Post-Order (Parallel)
Complete: Update Order Status

Main Orchestrator Code:

JavaScript
1const order = $input.item.json;
2const pipeline = {
3 orderId: order.id,
4 startedAt: new Date().toISOString(),
5 stages: [],
6 currentStage: null,
7 status: 'processing'
8};
9
10// Stage 1: Validation
11pipeline.currentStage = 'validation';
12const validation = await executeStage('validation', {
13 validateOrderData: () => validateOrder(order),
14 checkInventory: () => checkInventoryAvailability(order.items),
15 verifyCustomer: () => verifyCustomerAccount(order.customerId),
16 calculateTotals: () => calculateOrderTotals(order)
17});
18
19if (!validation.success) {
20 return finalizePipeline(pipeline, 'failed', validation.errors);
21}
22
23pipeline.stages.push({ stage: 'validation', result: validation });
24
25// Stage 2: Payment
26pipeline.currentStage = 'payment';
27const payment = await executeWithRetry(
28 () => processPayment(order.paymentMethod, validation.totals),
29 { maxRetries: 3, backoff: 'exponential' }
30);
31
32if (!payment.success) {
33 await rollback(pipeline, 'payment');
34 return finalizePipeline(pipeline, 'payment_failed', payment.error);
35}
36
37pipeline.stages.push({ stage: 'payment', result: payment });
38
39// Stage 3: Fulfillment
40pipeline.currentStage = 'fulfillment';
41const fulfillment = order.isDigital
42 ? await deliverDigitalProduct(order)
43 : await createPhysicalShipment(order);
44
45pipeline.stages.push({ stage: 'fulfillment', result: fulfillment });
46
47// Stage 4: Post-order (parallel)
48pipeline.currentStage = 'post_order';
49const postOrderTasks = await Promise.allSettled([
50 sendConfirmationEmail(order, fulfillment),
51 updateInventoryRecords(order.items),
52 recordOrderAnalytics(order, pipeline),
53 updateCRMContact(order.customerId, order),
54 awardLoyaltyPoints(order.customerId, order.total)
55]);
56
57pipeline.stages.push({
58 stage: 'post_order',
59 results: postOrderTasks.map((t, i) => ({
60 task: ['email', 'inventory', 'analytics', 'crm', 'loyalty'][i],
61 status: t.status,
62 result: t.status === 'fulfilled' ? t.value : t.reason
63 }))
64});
65
66return finalizePipeline(pipeline, 'completed');
67
68// Helper functions
69async function executeStage(name, tasks) {
70 const results = {};
71 for (const [taskName, taskFn] of Object.entries(tasks)) {
72 try {
73 results[taskName] = await taskFn();
74 } catch (error) {
75 return { success: false, errors: [{ task: taskName, error: error.message }] };
76 }
77 }
78 return { success: true, ...results };
79}
80
81function finalizePipeline(pipeline, status, error = null) {
82 return {
83 ...pipeline,
84 status,
85 error,
86 completedAt: new Date().toISOString(),
87 duration: Date.now() - new Date(pipeline.startedAt).getTime()
88 };
89}

Checkpoint

E-commerce order pipeline bao gồm mấy stages chính?

9

📊 Monitoring & Observability

TB5 min
Production Monitoring

Key Metrics:

  • Execution time per workflow
  • Success/failure rates
  • Queue depths
  • Error frequency by type
  • Throughput (executions/hour)

Alerting Rules:

  • Error rate > 5% → Warning
  • Error rate > 10% → Critical
  • Queue depth > 1000 → Warning
  • Avg execution > 30s → Investigate

Checkpoint

Key monitoring metrics bao gồm những gì?

10

📋 Best Practices Summary

TB5 min
Complex Workflow Guidelines

Architecture:

  • ✅ Break into sub-workflows
  • ✅ Use queues for reliability
  • ✅ Implement idempotency
  • ✅ Log extensively
  • ✅ Monitor everything

Error Handling:

  • ✅ Always have error branches
  • ✅ Implement retries with backoff
  • ✅ Use Dead Letter Queues
  • ✅ Alert on persistent failures

Performance:

  • ✅ Process in batches
  • ✅ Use parallel where possible
  • ✅ Set appropriate timeouts
  • ✅ Cache frequently used data

Checkpoint

Tại sao cần implement idempotency cho production workflows?

11

📝 Bài Tập Thực Hành

TB5 min
Final Challenge

Build a complete pipeline:

  1. Design multi-stage workflow with state machine
  2. Implement error handling with retries & DLQ
  3. Add parallel processing for performance
  4. Create sub-workflows for modularity
  5. Set up monitoring & alerting

You're now an n8n expert! 🎓

Checkpoint

Bạn đã hoàn thành final challenge chưa?

12

🌟 Course Summary

TB5 min
What You've Learned

Module 1: Personal Productivity

  • Email, Calendar, Notes automation
  • Personal dashboards

Module 2: Team Workflows

  • Notifications, Documents, Meetings
  • Onboarding processes

Module 3: Content & Social

  • Content curation & repurposing
  • Social media automation
  • Lead generation

Module 4: Advanced Patterns

  • Error handling & recovery
  • Sub-workflows & modularization
  • Parallel processing
  • State machines & queues
  • Event-driven architecture

You're ready to automate anything! 🚀

Checkpoint

Khóa học n8n Productivity bao gồm mấy module chính?

13

🚀 Next Steps

TB5 min
Continue Your Journey
  • 📚 Explore n8n community templates
  • 🔗 Join n8n Discord/Forum
  • 🛠️ Build real projects for your team
  • 📖 Check out n8n Advanced Agents course
  • 🎯 Practice, iterate, improve!

Checkpoint

Bạn dự định áp dụng n8n vào project nào tiếp theo?

🎉 Chúc mừng!

Chúc mừng bạn đã hoàn thành khóa học n8n Productivity! 🎉

Bạn đã master:

  • ⚡ Email, Calendar, Notes automation
  • 📊 Personal dashboards và team workflows
  • 📱 Content curation và social media automation
  • 🏗️ Advanced patterns: error handling, sub-workflows, parallel processing

Hãy áp dụng những gì đã học và tiếp tục explore n8n community!