ETL Pipelines with Python
1. ETL là gì?
ETL = Extract, Transform, Load
Text
1┌──────────┐ ┌─────────────┐ ┌──────────┐2│ Extract │ -> │ Transform │ -> │ Load │3│ (E) │ │ (T) │ │ (L) │4└──────────┘ └─────────────┘ └──────────┘5 │ │ │6 v v v7- Databases - Clean - Data Warehouse8- APIs - Aggregate - Data Lake9- Files - Join - Database10- Streams - Enrich - FilesETL vs ELT
- ETL: Transform trước khi load (traditional)
- ELT: Load raw data trước, transform sau (modern data warehouses)
2. Extract (Trích xuất dữ liệu)
2.1 From Databases
Python
1import pandas as pd2from sqlalchemy import create_engine34# PostgreSQL5pg_engine = create_engine('postgresql://user:password@host:5432/dbname')6df = pd.read_sql('SELECT * FROM sales WHERE date >= %s', pg_engine, params=['2024-01-01'])78# MySQL9mysql_engine = create_engine('mysql+pymysql://user:password@host:3306/dbname')1011# SQL Server12mssql_engine = create_engine('mssql+pyodbc://user:password@server/dbname?driver=ODBC+Driver+17+for+SQL+Server')1314# With chunking for large tables15chunks = []16for chunk in pd.read_sql('SELECT * FROM large_table', engine, chunksize=100000):17 chunks.append(chunk)18df = pd.concat(chunks)2.2 From APIs
Python
1import requests2import pandas as pd34def extract_from_api(url, params=None, headers=None):5 """Extract data from REST API with pagination"""6 all_data = []7 page = 18 9 while True:10 params = params or {}11 params['page'] = page12 13 response = requests.get(url, params=params, headers=headers)14 response.raise_for_status()15 16 data = response.json()17 18 if not data['results']: # No more data19 break20 21 all_data.extend(data['results'])22 page += 123 24 return pd.DataFrame(all_data)2526# Example usage27df = extract_from_api(28 'https://api.example.com/orders',29 headers={'Authorization': 'Bearer token123'}30)2.3 From Files
Python
1import pandas as pd2import glob3from pathlib import Path45# Multiple CSV files6csv_files = glob.glob('data/sales_*.csv')7df = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)89# Excel với multiple sheets10excel_file = pd.ExcelFile('data/report.xlsx')11dfs = {sheet: pd.read_excel(excel_file, sheet_name=sheet) 12 for sheet in excel_file.sheet_names}1314# JSON15df = pd.read_json('data/orders.json', lines=True) # JSON Lines format1617# Parquet (columnar, efficient)18df = pd.read_parquet('data/sales.parquet')1920# Large files with Dask21import dask.dataframe as dd22ddf = dd.read_csv('data/large_*.csv') # Lazy loading2.4 From Cloud Storage
Python
1# AWS S32import boto33import pandas as pd4from io import StringIO56s3 = boto3.client('s3')7obj = s3.get_object(Bucket='my-bucket', Key='data/sales.csv')8df = pd.read_csv(StringIO(obj['Body'].read().decode('utf-8')))910# Azure Blob Storage11from azure.storage.blob import BlobServiceClient12blob_service = BlobServiceClient.from_connection_string(conn_str)13blob_client = blob_service.get_blob_client('container', 'data/sales.csv')14df = pd.read_csv(StringIO(blob_client.download_blob().readall().decode()))1516# Google Cloud Storage17from google.cloud import storage18client = storage.Client()19bucket = client.bucket('my-bucket')20blob = bucket.blob('data/sales.csv')21df = pd.read_csv(StringIO(blob.download_as_string().decode()))3. Transform (Biến đổi dữ liệu)
3.1 Common Transformations
Python
1import pandas as pd2import numpy as np34def transform_sales_data(df):5 """Transform raw sales data"""6 7 # 1. Standardize column names8 df.columns = df.columns.str.lower().str.replace(' ', '_')9 10 # 2. Parse dates11 df['order_date'] = pd.to_datetime(df['order_date'])12 13 # 3. Clean strings14 df['product_name'] = df['product_name'].str.strip().str.title()15 16 # 4. Handle missing values17 df['discount'] = df['discount'].fillna(0)18 df['customer_segment'] = df['customer_segment'].fillna('Unknown')19 20 # 5. Calculate derived columns21 df['revenue'] = df['quantity'] * df['unit_price'] * (1 - df['discount'])22 df['order_year'] = df['order_date'].dt.year23 df['order_month'] = df['order_date'].dt.month24 df['order_quarter'] = df['order_date'].dt.quarter25 26 # 6. Categorize27 df['revenue_tier'] = pd.cut(28 df['revenue'],29 bins=[0, 100, 500, 1000, np.inf],30 labels=['Small', 'Medium', 'Large', 'Enterprise']31 )32 33 # 7. Filter invalid records34 df = df[df['quantity'] > 0]35 df = df[df['unit_price'] > 0]36 37 return df3.2 Data Enrichment
Python
1def enrich_with_customer_data(sales_df, customers_df):2 """Enrich sales with customer information"""3 4 # Merge with customer data5 enriched = sales_df.merge(6 customers_df[['customer_id', 'segment', 'region', 'join_date']],7 on='customer_id',8 how='left'9 )10 11 # Calculate customer tenure12 enriched['customer_tenure_days'] = (13 enriched['order_date'] - enriched['join_date']14 ).dt.days15 16 # Flag new customers (< 30 days)17 enriched['is_new_customer'] = enriched['customer_tenure_days'] < 3018 19 return enriched3.3 Aggregations
Python
1def create_daily_summary(df):2 """Aggregate to daily summary"""3 4 daily = df.groupby(['order_date', 'product_category']).agg(5 total_orders=('order_id', 'nunique'),6 total_revenue=('revenue', 'sum'),7 total_quantity=('quantity', 'sum'),8 unique_customers=('customer_id', 'nunique'),9 avg_order_value=('revenue', 'mean')10 ).reset_index()11 12 # Add running totals13 daily['cumulative_revenue'] = daily.groupby('product_category')['total_revenue'].cumsum()14 15 return daily3.4 Data Validation in Transform
Python
1def validate_transform(df):2 """Validate transformed data"""3 assertions = []4 5 # Check for nulls in required columns6 required_cols = ['order_id', 'customer_id', 'order_date', 'revenue']7 for col in required_cols:8 null_count = df[col].isnull().sum()9 assertions.append({10 'check': f'no_nulls_{col}',11 'passed': null_count == 0,12 'details': f'{null_count} null values'13 })14 15 # Check value ranges16 assertions.append({17 'check': 'positive_revenue',18 'passed': (df['revenue'] >= 0).all(),19 'details': f"{(df['revenue'] < 0).sum()} negative values"20 })21 22 # Check referential integrity23 assertions.append({24 'check': 'valid_dates',25 'passed': df['order_date'].notna().all(),26 'details': 'All dates are valid'27 })28 29 # Log results30 failed = [a for a in assertions if not a['passed']]31 if failed:32 raise ValueError(f"Validation failed: {failed}")33 34 return df4. Load (Nạp dữ liệu)
4.1 Load to Database
Python
1from sqlalchemy import create_engine23def load_to_database(df, table_name, engine, if_exists='append'):4 """Load DataFrame to database"""5 6 # Basic load7 df.to_sql(8 table_name,9 engine,10 if_exists=if_exists, # 'fail', 'replace', 'append'11 index=False,12 method='multi', # Faster bulk insert13 chunksize=1000014 )15 16 print(f"Loaded {len(df)} rows to {table_name}")1718# Usage19engine = create_engine('postgresql://user:pass@host/db')20load_to_database(transformed_df, 'fact_sales', engine)4.2 Upsert (Update or Insert)
Python
1from sqlalchemy import text23def upsert_to_postgres(df, table_name, engine, unique_cols):4 """Upsert data to PostgreSQL"""5 6 # Create temp table7 temp_table = f"{table_name}_temp"8 df.to_sql(temp_table, engine, if_exists='replace', index=False)9 10 # Build upsert query11 columns = df.columns.tolist()12 update_cols = [c for c in columns if c not in unique_cols]13 14 conflict_cols = ', '.join(unique_cols)15 update_set = ', '.join([f"{c} = EXCLUDED.{c}" for c in update_cols])16 17 query = f"""18 INSERT INTO {table_name} ({', '.join(columns)})19 SELECT {', '.join(columns)} FROM {temp_table}20 ON CONFLICT ({conflict_cols})21 DO UPDATE SET {update_set}22 """23 24 with engine.connect() as conn:25 conn.execute(text(query))26 conn.execute(text(f"DROP TABLE {temp_table}"))27 conn.commit()4.3 Load to Data Lake
Python
1import pyarrow as pa2import pyarrow.parquet as pq34def load_to_data_lake(df, path, partition_cols=None):5 """Load to data lake with partitioning"""6 7 # Convert to PyArrow Table8 table = pa.Table.from_pandas(df)9 10 # Write partitioned parquet11 pq.write_to_dataset(12 table,13 root_path=path,14 partition_cols=partition_cols,15 compression='snappy'16 )1718# Usage - partitioned by year/month19load_to_data_lake(20 df,21 's3://data-lake/sales/',22 partition_cols=['year', 'month']23)4.4 Incremental Loading
Python
1def incremental_load(df, table_name, engine, date_column, watermark_table):2 """Load only new data based on watermark"""3 4 # Get last watermark5 with engine.connect() as conn:6 result = conn.execute(text(f"""7 SELECT MAX(watermark_value) FROM {watermark_table}8 WHERE table_name = '{table_name}'9 """))10 last_watermark = result.scalar() or '1900-01-01'11 12 # Filter new records13 new_records = df[df[date_column] > last_watermark]14 15 if len(new_records) > 0:16 # Load new records17 new_records.to_sql(table_name, engine, if_exists='append', index=False)18 19 # Update watermark20 new_watermark = new_records[date_column].max()21 with engine.connect() as conn:22 conn.execute(text(f"""23 INSERT INTO {watermark_table} (table_name, watermark_value, updated_at)24 VALUES ('{table_name}', '{new_watermark}', NOW())25 ON CONFLICT (table_name) DO UPDATE SET26 watermark_value = EXCLUDED.watermark_value,27 updated_at = NOW()28 """))29 conn.commit()30 31 print(f"Loaded {len(new_records)} new records")32 else:33 print("No new records to load")5. Complete ETL Pipeline
Python
1import logging2from datetime import datetime3from dataclasses import dataclass4from typing import Optional56logging.basicConfig(level=logging.INFO)7logger = logging.getLogger(__name__)89@dataclass10class ETLConfig:11 source_connection: str12 target_connection: str13 source_query: str14 target_table: str15 batch_size: int = 100001617class ETLPipeline:18 """Production-ready ETL Pipeline"""19 20 def __init__(self, config: ETLConfig):21 self.config = config22 self.metrics = {23 'start_time': None,24 'end_time': None,25 'rows_extracted': 0,26 'rows_transformed': 0,27 'rows_loaded': 0,28 'errors': []29 }30 31 def extract(self) -> pd.DataFrame:32 """Extract data from source"""33 logger.info("Starting extraction...")34 try:35 engine = create_engine(self.config.source_connection)36 df = pd.read_sql(self.config.source_query, engine)37 self.metrics['rows_extracted'] = len(df)38 logger.info(f"Extracted {len(df)} rows")39 return df40 except Exception as e:41 self.metrics['errors'].append(f"Extract error: {str(e)}")42 raise43 44 def transform(self, df: pd.DataFrame) -> pd.DataFrame:45 """Transform data"""46 logger.info("Starting transformation...")47 try:48 # Apply transformations49 df = self._clean_data(df)50 df = self._enrich_data(df)51 df = self._validate_data(df)52 53 self.metrics['rows_transformed'] = len(df)54 logger.info(f"Transformed {len(df)} rows")55 return df56 except Exception as e:57 self.metrics['errors'].append(f"Transform error: {str(e)}")58 raise59 60 def load(self, df: pd.DataFrame):61 """Load data to target"""62 logger.info("Starting load...")63 try:64 engine = create_engine(self.config.target_connection)65 df.to_sql(66 self.config.target_table,67 engine,68 if_exists='append',69 index=False,70 chunksize=self.config.batch_size71 )72 self.metrics['rows_loaded'] = len(df)73 logger.info(f"Loaded {len(df)} rows")74 except Exception as e:75 self.metrics['errors'].append(f"Load error: {str(e)}")76 raise77 78 def run(self) -> dict:79 """Run complete ETL pipeline"""80 self.metrics['start_time'] = datetime.now()81 82 try:83 # Extract84 raw_data = self.extract()85 86 # Transform87 transformed_data = self.transform(raw_data)88 89 # Load90 self.load(transformed_data)91 92 self.metrics['status'] = 'SUCCESS'93 except Exception as e:94 self.metrics['status'] = 'FAILED'95 logger.error(f"Pipeline failed: {str(e)}")96 finally:97 self.metrics['end_time'] = datetime.now()98 self.metrics['duration'] = (99 self.metrics['end_time'] - self.metrics['start_time']100 ).total_seconds()101 102 return self.metrics103 104 def _clean_data(self, df):105 """Data cleaning logic"""106 df = df.drop_duplicates()107 df.columns = df.columns.str.lower().str.replace(' ', '_')108 return df109 110 def _enrich_data(self, df):111 """Data enrichment logic"""112 df['processed_at'] = datetime.now()113 return df114 115 def _validate_data(self, df):116 """Data validation logic"""117 # Add validation rules118 return df119120121# Usage122config = ETLConfig(123 source_connection='postgresql://source_host/source_db',124 target_connection='postgresql://target_host/target_db',125 source_query='SELECT * FROM raw_sales WHERE date >= CURRENT_DATE - 1',126 target_table='fact_sales'127)128129pipeline = ETLPipeline(config)130result = pipeline.run()131print(result)6. ETL Scheduling
6.1 Với Schedule Library
Python
1import schedule2import time34def run_daily_etl():5 """Daily ETL job"""6 config = ETLConfig(...)7 pipeline = ETLPipeline(config)8 result = pipeline.run()9 10 # Send notification11 if result['status'] == 'FAILED':12 send_alert(result)1314# Schedule jobs15schedule.every().day.at("02:00").do(run_daily_etl)16schedule.every().hour.do(run_hourly_etl)1718# Run scheduler19while True:20 schedule.run_pending()21 time.sleep(60)6.2 Với Apache Airflow
Python
1from airflow import DAG2from airflow.operators.python import PythonOperator3from datetime import datetime, timedelta45default_args = {6 'owner': 'data-team',7 'retries': 3,8 'retry_delay': timedelta(minutes=5)9}1011dag = DAG(12 'sales_etl',13 default_args=default_args,14 schedule_interval='0 2 * * *', # Daily at 2 AM15 start_date=datetime(2024, 1, 1),16 catchup=False17)1819def extract_task(**context):20 # Extract logic21 pass2223def transform_task(**context):24 # Transform logic25 pass2627def load_task(**context):28 # Load logic29 pass3031extract = PythonOperator(32 task_id='extract',33 python_callable=extract_task,34 dag=dag35)3637transform = PythonOperator(38 task_id='transform',39 python_callable=transform_task,40 dag=dag41)4243load = PythonOperator(44 task_id='load',45 python_callable=load_task,46 dag=dag47)4849extract >> transform >> load7. Thực hành
Mini Project
Build ETL pipeline cho sales data
Exercise: Complete ETL Pipeline
Python
1# Build ETL pipeline với các yêu cầu:2# 1. Extract: Đọc từ 3 CSV files (orders, customers, products)3# 2. Transform:4# - Join 3 tables5# - Calculate revenue, profit6# - Handle missing values7# - Add date dimensions8# 3. Load: Save to Parquet với partitioning by month910# YOUR CODE HERE💡 Xem đáp án
Python
1import pandas as pd2from pathlib import Path34class SalesETLPipeline:5 def __init__(self, data_dir, output_dir):6 self.data_dir = Path(data_dir)7 self.output_dir = Path(output_dir)8 9 def extract(self):10 """Extract from CSV files"""11 orders = pd.read_csv(self.data_dir / 'orders.csv')12 customers = pd.read_csv(self.data_dir / 'customers.csv')13 products = pd.read_csv(self.data_dir / 'products.csv')14 15 return {'orders': orders, 'customers': customers, 'products': products}16 17 def transform(self, data):18 """Transform and join data"""19 # Join tables20 df = (21 data['orders']22 .merge(data['customers'], on='customer_id', how='left')23 .merge(data['products'], on='product_id', how='left')24 )25 26 # Parse dates27 df['order_date'] = pd.to_datetime(df['order_date'])28 29 # Calculate metrics30 df['revenue'] = df['quantity'] * df['unit_price']31 df['profit'] = df['revenue'] * df['margin']32 33 # Handle missing34 df['region'] = df['region'].fillna('Unknown')35 36 # Add date dimensions37 df['year'] = df['order_date'].dt.year38 df['month'] = df['order_date'].dt.month39 df['year_month'] = df['order_date'].dt.to_period('M').astype(str)40 41 return df42 43 def load(self, df):44 """Load to partitioned parquet"""45 self.output_dir.mkdir(parents=True, exist_ok=True)46 47 # Write partitioned by year_month48 for period, group in df.groupby('year_month'):49 output_path = self.output_dir / f'sales_{period}.parquet'50 group.to_parquet(output_path, index=False)51 print(f"Saved {len(group)} rows to {output_path}")52 53 def run(self):54 data = self.extract()55 transformed = self.transform(data)56 self.load(transformed)57 return len(transformed)5859# Run pipeline60pipeline = SalesETLPipeline('data/raw/', 'data/processed/')61rows = pipeline.run()62print(f"Pipeline completed: {rows} rows processed")8. Tổng kết
| Phase | Key Concepts | Tools |
|---|---|---|
| Extract | Databases, APIs, Files, Cloud | SQLAlchemy, requests, pandas |
| Transform | Clean, Enrich, Aggregate, Validate | pandas, numpy |
| Load | Append, Upsert, Partition | SQLAlchemy, PyArrow |
| Orchestrate | Schedule, Monitor, Alert | Airflow, Prefect, schedule |
Best Practices:
- Idempotent pipelines (có thể rerun safely)
- Incremental loads khi có thể
- Data validation ở mỗi stage
- Comprehensive logging và monitoring
- Error handling và alerting
Bài tiếp theo: Apache Spark Introduction
