Lý thuyết
Bài 5/15

ETL Pipelines with Python

Extract, Transform, Load pipelines - Architecture, tools và best practices

ETL Pipelines with Python

ETL Data Pipeline

1. ETL là gì?

ETL = Extract, Transform, Load

Text
1┌──────────┐ ┌─────────────┐ ┌──────────┐
2│ Extract │ -> │ Transform │ -> │ Load │
3│ (E) │ │ (T) │ │ (L) │
4└──────────┘ └─────────────┘ └──────────┘
5 │ │ │
6 v v v
7- Databases - Clean - Data Warehouse
8- APIs - Aggregate - Data Lake
9- Files - Join - Database
10- Streams - Enrich - Files
ETL vs ELT
  • ETL: Transform trước khi load (traditional)
  • ELT: Load raw data trước, transform sau (modern data warehouses)

2. Extract (Trích xuất dữ liệu)

2.1 From Databases

Python
1import pandas as pd
2from sqlalchemy import create_engine
3
4# PostgreSQL
5pg_engine = create_engine('postgresql://user:password@host:5432/dbname')
6df = pd.read_sql('SELECT * FROM sales WHERE date >= %s', pg_engine, params=['2024-01-01'])
7
8# MySQL
9mysql_engine = create_engine('mysql+pymysql://user:password@host:3306/dbname')
10
11# SQL Server
12mssql_engine = create_engine('mssql+pyodbc://user:password@server/dbname?driver=ODBC+Driver+17+for+SQL+Server')
13
14# With chunking for large tables
15chunks = []
16for chunk in pd.read_sql('SELECT * FROM large_table', engine, chunksize=100000):
17 chunks.append(chunk)
18df = pd.concat(chunks)

2.2 From APIs

Python
1import requests
2import pandas as pd
3
4def extract_from_api(url, params=None, headers=None):
5 """Extract data from REST API with pagination"""
6 all_data = []
7 page = 1
8
9 while True:
10 params = params or {}
11 params['page'] = page
12
13 response = requests.get(url, params=params, headers=headers)
14 response.raise_for_status()
15
16 data = response.json()
17
18 if not data['results']: # No more data
19 break
20
21 all_data.extend(data['results'])
22 page += 1
23
24 return pd.DataFrame(all_data)
25
26# Example usage
27df = extract_from_api(
28 'https://api.example.com/orders',
29 headers={'Authorization': 'Bearer token123'}
30)

2.3 From Files

Python
1import pandas as pd
2import glob
3from pathlib import Path
4
5# Multiple CSV files
6csv_files = glob.glob('data/sales_*.csv')
7df = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)
8
9# Excel với multiple sheets
10excel_file = pd.ExcelFile('data/report.xlsx')
11dfs = {sheet: pd.read_excel(excel_file, sheet_name=sheet)
12 for sheet in excel_file.sheet_names}
13
14# JSON
15df = pd.read_json('data/orders.json', lines=True) # JSON Lines format
16
17# Parquet (columnar, efficient)
18df = pd.read_parquet('data/sales.parquet')
19
20# Large files with Dask
21import dask.dataframe as dd
22ddf = dd.read_csv('data/large_*.csv') # Lazy loading

2.4 From Cloud Storage

Python
1# AWS S3
2import boto3
3import pandas as pd
4from io import StringIO
5
6s3 = boto3.client('s3')
7obj = s3.get_object(Bucket='my-bucket', Key='data/sales.csv')
8df = pd.read_csv(StringIO(obj['Body'].read().decode('utf-8')))
9
10# Azure Blob Storage
11from azure.storage.blob import BlobServiceClient
12blob_service = BlobServiceClient.from_connection_string(conn_str)
13blob_client = blob_service.get_blob_client('container', 'data/sales.csv')
14df = pd.read_csv(StringIO(blob_client.download_blob().readall().decode()))
15
16# Google Cloud Storage
17from google.cloud import storage
18client = storage.Client()
19bucket = client.bucket('my-bucket')
20blob = bucket.blob('data/sales.csv')
21df = pd.read_csv(StringIO(blob.download_as_string().decode()))

3. Transform (Biến đổi dữ liệu)

3.1 Common Transformations

Python
1import pandas as pd
2import numpy as np
3
4def transform_sales_data(df):
5 """Transform raw sales data"""
6
7 # 1. Standardize column names
8 df.columns = df.columns.str.lower().str.replace(' ', '_')
9
10 # 2. Parse dates
11 df['order_date'] = pd.to_datetime(df['order_date'])
12
13 # 3. Clean strings
14 df['product_name'] = df['product_name'].str.strip().str.title()
15
16 # 4. Handle missing values
17 df['discount'] = df['discount'].fillna(0)
18 df['customer_segment'] = df['customer_segment'].fillna('Unknown')
19
20 # 5. Calculate derived columns
21 df['revenue'] = df['quantity'] * df['unit_price'] * (1 - df['discount'])
22 df['order_year'] = df['order_date'].dt.year
23 df['order_month'] = df['order_date'].dt.month
24 df['order_quarter'] = df['order_date'].dt.quarter
25
26 # 6. Categorize
27 df['revenue_tier'] = pd.cut(
28 df['revenue'],
29 bins=[0, 100, 500, 1000, np.inf],
30 labels=['Small', 'Medium', 'Large', 'Enterprise']
31 )
32
33 # 7. Filter invalid records
34 df = df[df['quantity'] > 0]
35 df = df[df['unit_price'] > 0]
36
37 return df

3.2 Data Enrichment

Python
1def enrich_with_customer_data(sales_df, customers_df):
2 """Enrich sales with customer information"""
3
4 # Merge with customer data
5 enriched = sales_df.merge(
6 customers_df[['customer_id', 'segment', 'region', 'join_date']],
7 on='customer_id',
8 how='left'
9 )
10
11 # Calculate customer tenure
12 enriched['customer_tenure_days'] = (
13 enriched['order_date'] - enriched['join_date']
14 ).dt.days
15
16 # Flag new customers (< 30 days)
17 enriched['is_new_customer'] = enriched['customer_tenure_days'] < 30
18
19 return enriched

3.3 Aggregations

Python
1def create_daily_summary(df):
2 """Aggregate to daily summary"""
3
4 daily = df.groupby(['order_date', 'product_category']).agg(
5 total_orders=('order_id', 'nunique'),
6 total_revenue=('revenue', 'sum'),
7 total_quantity=('quantity', 'sum'),
8 unique_customers=('customer_id', 'nunique'),
9 avg_order_value=('revenue', 'mean')
10 ).reset_index()
11
12 # Add running totals
13 daily['cumulative_revenue'] = daily.groupby('product_category')['total_revenue'].cumsum()
14
15 return daily

3.4 Data Validation in Transform

Python
1def validate_transform(df):
2 """Validate transformed data"""
3 assertions = []
4
5 # Check for nulls in required columns
6 required_cols = ['order_id', 'customer_id', 'order_date', 'revenue']
7 for col in required_cols:
8 null_count = df[col].isnull().sum()
9 assertions.append({
10 'check': f'no_nulls_{col}',
11 'passed': null_count == 0,
12 'details': f'{null_count} null values'
13 })
14
15 # Check value ranges
16 assertions.append({
17 'check': 'positive_revenue',
18 'passed': (df['revenue'] >= 0).all(),
19 'details': f"{(df['revenue'] < 0).sum()} negative values"
20 })
21
22 # Check referential integrity
23 assertions.append({
24 'check': 'valid_dates',
25 'passed': df['order_date'].notna().all(),
26 'details': 'All dates are valid'
27 })
28
29 # Log results
30 failed = [a for a in assertions if not a['passed']]
31 if failed:
32 raise ValueError(f"Validation failed: {failed}")
33
34 return df

4. Load (Nạp dữ liệu)

4.1 Load to Database

Python
1from sqlalchemy import create_engine
2
3def load_to_database(df, table_name, engine, if_exists='append'):
4 """Load DataFrame to database"""
5
6 # Basic load
7 df.to_sql(
8 table_name,
9 engine,
10 if_exists=if_exists, # 'fail', 'replace', 'append'
11 index=False,
12 method='multi', # Faster bulk insert
13 chunksize=10000
14 )
15
16 print(f"Loaded {len(df)} rows to {table_name}")
17
18# Usage
19engine = create_engine('postgresql://user:pass@host/db')
20load_to_database(transformed_df, 'fact_sales', engine)

4.2 Upsert (Update or Insert)

Python
1from sqlalchemy import text
2
3def upsert_to_postgres(df, table_name, engine, unique_cols):
4 """Upsert data to PostgreSQL"""
5
6 # Create temp table
7 temp_table = f"{table_name}_temp"
8 df.to_sql(temp_table, engine, if_exists='replace', index=False)
9
10 # Build upsert query
11 columns = df.columns.tolist()
12 update_cols = [c for c in columns if c not in unique_cols]
13
14 conflict_cols = ', '.join(unique_cols)
15 update_set = ', '.join([f"{c} = EXCLUDED.{c}" for c in update_cols])
16
17 query = f"""
18 INSERT INTO {table_name} ({', '.join(columns)})
19 SELECT {', '.join(columns)} FROM {temp_table}
20 ON CONFLICT ({conflict_cols})
21 DO UPDATE SET {update_set}
22 """
23
24 with engine.connect() as conn:
25 conn.execute(text(query))
26 conn.execute(text(f"DROP TABLE {temp_table}"))
27 conn.commit()

4.3 Load to Data Lake

Python
1import pyarrow as pa
2import pyarrow.parquet as pq
3
4def load_to_data_lake(df, path, partition_cols=None):
5 """Load to data lake with partitioning"""
6
7 # Convert to PyArrow Table
8 table = pa.Table.from_pandas(df)
9
10 # Write partitioned parquet
11 pq.write_to_dataset(
12 table,
13 root_path=path,
14 partition_cols=partition_cols,
15 compression='snappy'
16 )
17
18# Usage - partitioned by year/month
19load_to_data_lake(
20 df,
21 's3://data-lake/sales/',
22 partition_cols=['year', 'month']
23)

4.4 Incremental Loading

Python
1def incremental_load(df, table_name, engine, date_column, watermark_table):
2 """Load only new data based on watermark"""
3
4 # Get last watermark
5 with engine.connect() as conn:
6 result = conn.execute(text(f"""
7 SELECT MAX(watermark_value) FROM {watermark_table}
8 WHERE table_name = '{table_name}'
9 """))
10 last_watermark = result.scalar() or '1900-01-01'
11
12 # Filter new records
13 new_records = df[df[date_column] > last_watermark]
14
15 if len(new_records) > 0:
16 # Load new records
17 new_records.to_sql(table_name, engine, if_exists='append', index=False)
18
19 # Update watermark
20 new_watermark = new_records[date_column].max()
21 with engine.connect() as conn:
22 conn.execute(text(f"""
23 INSERT INTO {watermark_table} (table_name, watermark_value, updated_at)
24 VALUES ('{table_name}', '{new_watermark}', NOW())
25 ON CONFLICT (table_name) DO UPDATE SET
26 watermark_value = EXCLUDED.watermark_value,
27 updated_at = NOW()
28 """))
29 conn.commit()
30
31 print(f"Loaded {len(new_records)} new records")
32 else:
33 print("No new records to load")

5. Complete ETL Pipeline

Python
1import logging
2from datetime import datetime
3from dataclasses import dataclass
4from typing import Optional
5
6logging.basicConfig(level=logging.INFO)
7logger = logging.getLogger(__name__)
8
9@dataclass
10class ETLConfig:
11 source_connection: str
12 target_connection: str
13 source_query: str
14 target_table: str
15 batch_size: int = 10000
16
17class ETLPipeline:
18 """Production-ready ETL Pipeline"""
19
20 def __init__(self, config: ETLConfig):
21 self.config = config
22 self.metrics = {
23 'start_time': None,
24 'end_time': None,
25 'rows_extracted': 0,
26 'rows_transformed': 0,
27 'rows_loaded': 0,
28 'errors': []
29 }
30
31 def extract(self) -> pd.DataFrame:
32 """Extract data from source"""
33 logger.info("Starting extraction...")
34 try:
35 engine = create_engine(self.config.source_connection)
36 df = pd.read_sql(self.config.source_query, engine)
37 self.metrics['rows_extracted'] = len(df)
38 logger.info(f"Extracted {len(df)} rows")
39 return df
40 except Exception as e:
41 self.metrics['errors'].append(f"Extract error: {str(e)}")
42 raise
43
44 def transform(self, df: pd.DataFrame) -> pd.DataFrame:
45 """Transform data"""
46 logger.info("Starting transformation...")
47 try:
48 # Apply transformations
49 df = self._clean_data(df)
50 df = self._enrich_data(df)
51 df = self._validate_data(df)
52
53 self.metrics['rows_transformed'] = len(df)
54 logger.info(f"Transformed {len(df)} rows")
55 return df
56 except Exception as e:
57 self.metrics['errors'].append(f"Transform error: {str(e)}")
58 raise
59
60 def load(self, df: pd.DataFrame):
61 """Load data to target"""
62 logger.info("Starting load...")
63 try:
64 engine = create_engine(self.config.target_connection)
65 df.to_sql(
66 self.config.target_table,
67 engine,
68 if_exists='append',
69 index=False,
70 chunksize=self.config.batch_size
71 )
72 self.metrics['rows_loaded'] = len(df)
73 logger.info(f"Loaded {len(df)} rows")
74 except Exception as e:
75 self.metrics['errors'].append(f"Load error: {str(e)}")
76 raise
77
78 def run(self) -> dict:
79 """Run complete ETL pipeline"""
80 self.metrics['start_time'] = datetime.now()
81
82 try:
83 # Extract
84 raw_data = self.extract()
85
86 # Transform
87 transformed_data = self.transform(raw_data)
88
89 # Load
90 self.load(transformed_data)
91
92 self.metrics['status'] = 'SUCCESS'
93 except Exception as e:
94 self.metrics['status'] = 'FAILED'
95 logger.error(f"Pipeline failed: {str(e)}")
96 finally:
97 self.metrics['end_time'] = datetime.now()
98 self.metrics['duration'] = (
99 self.metrics['end_time'] - self.metrics['start_time']
100 ).total_seconds()
101
102 return self.metrics
103
104 def _clean_data(self, df):
105 """Data cleaning logic"""
106 df = df.drop_duplicates()
107 df.columns = df.columns.str.lower().str.replace(' ', '_')
108 return df
109
110 def _enrich_data(self, df):
111 """Data enrichment logic"""
112 df['processed_at'] = datetime.now()
113 return df
114
115 def _validate_data(self, df):
116 """Data validation logic"""
117 # Add validation rules
118 return df
119
120
121# Usage
122config = ETLConfig(
123 source_connection='postgresql://source_host/source_db',
124 target_connection='postgresql://target_host/target_db',
125 source_query='SELECT * FROM raw_sales WHERE date >= CURRENT_DATE - 1',
126 target_table='fact_sales'
127)
128
129pipeline = ETLPipeline(config)
130result = pipeline.run()
131print(result)

6. ETL Scheduling

6.1 Với Schedule Library

Python
1import schedule
2import time
3
4def run_daily_etl():
5 """Daily ETL job"""
6 config = ETLConfig(...)
7 pipeline = ETLPipeline(config)
8 result = pipeline.run()
9
10 # Send notification
11 if result['status'] == 'FAILED':
12 send_alert(result)
13
14# Schedule jobs
15schedule.every().day.at("02:00").do(run_daily_etl)
16schedule.every().hour.do(run_hourly_etl)
17
18# Run scheduler
19while True:
20 schedule.run_pending()
21 time.sleep(60)

6.2 Với Apache Airflow

Python
1from airflow import DAG
2from airflow.operators.python import PythonOperator
3from datetime import datetime, timedelta
4
5default_args = {
6 'owner': 'data-team',
7 'retries': 3,
8 'retry_delay': timedelta(minutes=5)
9}
10
11dag = DAG(
12 'sales_etl',
13 default_args=default_args,
14 schedule_interval='0 2 * * *', # Daily at 2 AM
15 start_date=datetime(2024, 1, 1),
16 catchup=False
17)
18
19def extract_task(**context):
20 # Extract logic
21 pass
22
23def transform_task(**context):
24 # Transform logic
25 pass
26
27def load_task(**context):
28 # Load logic
29 pass
30
31extract = PythonOperator(
32 task_id='extract',
33 python_callable=extract_task,
34 dag=dag
35)
36
37transform = PythonOperator(
38 task_id='transform',
39 python_callable=transform_task,
40 dag=dag
41)
42
43load = PythonOperator(
44 task_id='load',
45 python_callable=load_task,
46 dag=dag
47)
48
49extract >> transform >> load

7. Thực hành

Mini Project

Build ETL pipeline cho sales data

Exercise: Complete ETL Pipeline

Python
1# Build ETL pipeline với các yêu cầu:
2# 1. Extract: Đọc từ 3 CSV files (orders, customers, products)
3# 2. Transform:
4# - Join 3 tables
5# - Calculate revenue, profit
6# - Handle missing values
7# - Add date dimensions
8# 3. Load: Save to Parquet với partitioning by month
9
10# YOUR CODE HERE
💡 Xem đáp án
Python
1import pandas as pd
2from pathlib import Path
3
4class SalesETLPipeline:
5 def __init__(self, data_dir, output_dir):
6 self.data_dir = Path(data_dir)
7 self.output_dir = Path(output_dir)
8
9 def extract(self):
10 """Extract from CSV files"""
11 orders = pd.read_csv(self.data_dir / 'orders.csv')
12 customers = pd.read_csv(self.data_dir / 'customers.csv')
13 products = pd.read_csv(self.data_dir / 'products.csv')
14
15 return {'orders': orders, 'customers': customers, 'products': products}
16
17 def transform(self, data):
18 """Transform and join data"""
19 # Join tables
20 df = (
21 data['orders']
22 .merge(data['customers'], on='customer_id', how='left')
23 .merge(data['products'], on='product_id', how='left')
24 )
25
26 # Parse dates
27 df['order_date'] = pd.to_datetime(df['order_date'])
28
29 # Calculate metrics
30 df['revenue'] = df['quantity'] * df['unit_price']
31 df['profit'] = df['revenue'] * df['margin']
32
33 # Handle missing
34 df['region'] = df['region'].fillna('Unknown')
35
36 # Add date dimensions
37 df['year'] = df['order_date'].dt.year
38 df['month'] = df['order_date'].dt.month
39 df['year_month'] = df['order_date'].dt.to_period('M').astype(str)
40
41 return df
42
43 def load(self, df):
44 """Load to partitioned parquet"""
45 self.output_dir.mkdir(parents=True, exist_ok=True)
46
47 # Write partitioned by year_month
48 for period, group in df.groupby('year_month'):
49 output_path = self.output_dir / f'sales_{period}.parquet'
50 group.to_parquet(output_path, index=False)
51 print(f"Saved {len(group)} rows to {output_path}")
52
53 def run(self):
54 data = self.extract()
55 transformed = self.transform(data)
56 self.load(transformed)
57 return len(transformed)
58
59# Run pipeline
60pipeline = SalesETLPipeline('data/raw/', 'data/processed/')
61rows = pipeline.run()
62print(f"Pipeline completed: {rows} rows processed")

8. Tổng kết

PhaseKey ConceptsTools
ExtractDatabases, APIs, Files, CloudSQLAlchemy, requests, pandas
TransformClean, Enrich, Aggregate, Validatepandas, numpy
LoadAppend, Upsert, PartitionSQLAlchemy, PyArrow
OrchestrateSchedule, Monitor, AlertAirflow, Prefect, schedule

Best Practices:

  • Idempotent pipelines (có thể rerun safely)
  • Incremental loads khi có thể
  • Data validation ở mỗi stage
  • Comprehensive logging và monitoring
  • Error handling và alerting

Bài tiếp theo: Apache Spark Introduction