🎯 Mục tiêu bài học
- Kết nối Python với databases bằng SQLAlchemy
- Đọc data từ SQL vào Pandas DataFrame (pd.read_sql)
- Ghi data từ DataFrame vào database (to_sql, bulk insert)
- Xây dựng ETL pipelines và analysis modules hoàn chỉnh
| Thông tin | Chi tiết |
|---|---|
| ⏱️ Thời lượng | 2.5 giờ |
| 📖 Chủ đề chính | SQLAlchemy, pd.read_sql, to_sql, ETL |
| 💡 Kiến thức cần có | SQL cơ bản, Pandas fundamentals |
| 🎯 Output | Xây dựng analysis module kết nối database |
📖 Thuật ngữ quan trọng
| Thuật ngữ | Tiếng Việt | Mô tả |
|---|---|---|
| SQLAlchemy | Thư viện kết nối DB | Python SQL toolkit và ORM — cách chuẩn kết nối databases |
| Engine | Đối tượng kết nối | Connection object quản lý database connections |
| Connection String | Chuỗi kết nối | URI chứa thông tin kết nối (user, password, host, database) |
| pd.read_sql | Đọc SQL → DataFrame | Hàm Pandas đọc query results thành DataFrame |
| to_sql | Ghi DataFrame → DB | Method ghi DataFrame vào database table |
| Chunked Reading | Đọc theo phần | Đọc dữ liệu lớn theo từng batch (chunksize) |
| ORM | Ánh xạ đối tượng-quan hệ | Object-Relational Mapping — map Python classes to DB tables |
| ETL | Trích xuất-Biến đổi-Nạp | Extract-Transform-Load — data pipeline pattern |
| Connection Pool | Nhóm kết nối | Tái sử dụng database connections cho performance |
| Context Manager | Quản lý ngữ cảnh | Pattern with/try-finally đảm bảo đóng kết nối đúng cách |
Checkpoint
SQLAlchemy là cách chuẩn kết nối Python với databases. pd.read_sql đọc SQL → DataFrame, to_sql ghi ngược lại. ETL là pattern pipeline phổ biến nhất. SQL excel ở querying, Python excel ở analysis — kết hợp cả hai cho workflow analytics mạnh nhất!
🔌 Connection Methods
SQL excel ở querying và aggregating data. Python excel ở analysis, visualization, và ML. Kết hợp cả hai cho workflow mạnh mẽ nhất.
SQL + Python Integration
SQLAlchemy Engine
1from sqlalchemy import create_engine2import pandas as pd34# PostgreSQL5engine = create_engine('postgresql://user:password@host:5432/database')67# MySQL8engine = create_engine('mysql+pymysql://user:password@host:3306/database')910# SQLite11engine = create_engine('sqlite:///my_database.db')1213# SQL Server14engine = create_engine(15 'mssql+pyodbc://user:password@host/database'16 '?driver=ODBC+Driver+17+for+SQL+Server'17)1819# Connection with options20engine = create_engine(21 'postgresql://user:password@host:5432/database',22 echo=False, # Don't log queries23 pool_size=5, # Connection pool size24 pool_recycle=3600, # Recycle connections after 1 hour25 connect_args={'connect_timeout': 10}26)Environment Variables (Bảo mật)
1import os2from sqlalchemy import create_engine3from dotenv import load_dotenv45load_dotenv()67DB_USER = os.getenv('DB_USER')8DB_PASS = os.getenv('DB_PASS')9DB_HOST = os.getenv('DB_HOST')10DB_NAME = os.getenv('DB_NAME')1112connection_string = f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:5432/{DB_NAME}'13engine = create_engine(connection_string)1415# .env file example:16# DB_USER=myuser17# DB_PASS=mypassword18# DB_HOST=localhost19# DB_NAME=mydatabaseContext Manager Pattern
1from sqlalchemy import create_engine2from contextlib import contextmanager34@contextmanager5def get_db_connection():6 """Context manager for database connections"""7 engine = create_engine('postgresql://user:pass@host/db')8 connection = engine.connect()9 try:10 yield connection11 finally:12 connection.close()1314# Usage15with get_db_connection() as conn:16 result = conn.execute("SELECT * FROM orders LIMIT 10")17 for row in result:18 print(row)Checkpoint
SQLAlchemy create_engine() hỗ trợ PostgreSQL, MySQL, SQLite, SQL Server. Luôn dùng environment variables cho credentials, context manager đảm bảo đóng connection. Bạn đang dùng database nào? Connection string sẽ khác nhau tùy hệ quản trị!
📥 Reading Data — pd.read_sql()
Basic Query
1import pandas as pd2from sqlalchemy import create_engine34engine = create_engine('postgresql://user:pass@host/database')56# Basic query7df = pd.read_sql("SELECT * FROM orders", engine)89# With parameters (safe from SQL injection)10customer_id = 10011df = pd.read_sql(12 "SELECT * FROM orders WHERE customer_id = %(cid)s",13 engine,14 params={'cid': customer_id}15)1617# Complex query with JOINs18query = """19 SELECT 20 o.order_id, o.order_date, o.amount,21 c.name as customer_name22 FROM orders o23 JOIN customers c ON o.customer_id = c.id24 WHERE o.order_date >= '2024-01-01'25 ORDER BY o.order_date DESC26"""27df = pd.read_sql(query, engine)28print(f"Shape: {df.shape}")29print(df.head())Chunked Reading (Large Datasets)
1# Read in chunks to manage memory2chunk_size = 1000003chunks = []45for chunk in pd.read_sql(6 "SELECT * FROM large_table",7 engine,8 chunksize=chunk_size9):10 processed = chunk.query('amount > 0')11 chunks.append(processed)12 print(f"Processed chunk with {len(chunk)} rows")1314df = pd.concat(chunks, ignore_index=True)1516# Process without storing all chunks17total_amount = 018for chunk in pd.read_sql("SELECT amount FROM orders", engine, chunksize=10000):19 total_amount += chunk['amount'].sum()20print(f"Total: {total_amount}")Parse Dates and Index
1# Parse date columns automatically2df = pd.read_sql(3 "SELECT * FROM orders",4 engine,5 parse_dates=['order_date', 'ship_date']6)78# Set index column9df = pd.read_sql(10 "SELECT * FROM orders",11 engine,12 index_col='order_id'13)1415# Combined options16df = pd.read_sql(17 "SELECT * FROM orders WHERE order_date >= '2024-01-01'",18 engine,19 parse_dates={'order_date': '%Y-%m-%d'},20 index_col='order_id',21 coerce_float=True22)Checkpoint
pd.read_sql() đọc SQL query → DataFrame. Dùng params cho SQL injection safety. Chunked reading cho large datasets. parse_dates tự convert dates. Tip: Chunksize rất quan trọng khi làm việc với tables hàng triệu rows — bạn đã thử chưa?
📤 Writing Data — to_sql()
Basic Write
1# Create sample DataFrame2df = pd.DataFrame({3 'name': ['Product A', 'Product B', 'Product C'],4 'price': [100.0, 200.0, 150.0],5 'category': ['Electronics', 'Clothing', 'Electronics']6})78# Write to database9df.to_sql(10 'products', # Table name11 engine, # Connection12 if_exists='replace', # 'fail', 'replace', 'append'13 index=False # Don't write index as column14)1516# Append new data17new_products = pd.DataFrame({18 'name': ['Product D'],19 'price': [300.0],20 'category': ['Home']21})22new_products.to_sql('products', engine, if_exists='append', index=False)Write with Data Types
1from sqlalchemy import Integer, String, Float, DateTime23df.to_sql(4 'orders',5 engine,6 if_exists='replace',7 index=False,8 dtype={9 'order_id': Integer,10 'customer_name': String(100),11 'amount': Float,12 'order_date': DateTime13 }14)Efficient Bulk Insert
1# For large DataFrames, use method='multi' or chunks2df.to_sql(3 'large_table',4 engine,5 if_exists='append',6 index=False,7 method='multi', # Insert multiple rows at once8 chunksize=1000 # Commit every 1000 rows9)1011# Using COPY for PostgreSQL (fastest)12from io import StringIO1314def psql_insert_copy(table, conn, keys, data_iter):15 """Use PostgreSQL COPY for fast bulk insert"""16 dbapi_conn = conn.connection17 with dbapi_conn.cursor() as cur:18 s_buf = StringIO()19 for row in data_iter:20 s_buf.write('\t'.join(map(str, row)) + '\n')21 s_buf.seek(0)22 cur.copy_from(s_buf, table.name, columns=keys, null='')2324df.to_sql('table', engine, method=psql_insert_copy, index=False)Checkpoint
to_sql() ghi DataFrame → database. if_exists controls behavior (fail/replace/append). method='multi' + chunksize cho bulk insert hiệu quả. Replace vs Append — bạn chọn cái nào khi update data warehouse hàng ngày?
🔄 Practical Workflows — ETL & Analysis
ETL Pipeline Class
1import pandas as pd2from sqlalchemy import create_engine3from datetime import datetime45class ETLPipeline:6 def __init__(self, source_conn, target_conn):7 self.source_engine = create_engine(source_conn)8 self.target_engine = create_engine(target_conn)9 10 def extract(self, query):11 """Extract data from source"""12 print(f"Extracting data...")13 df = pd.read_sql(query, self.source_engine)14 print(f"Extracted {len(df)} rows")15 return df16 17 def transform(self, df):18 """Transform data"""19 print("Transforming data...")20 df = df.dropna(subset=['amount'])21 df['year'] = df['order_date'].dt.year22 df['month'] = df['order_date'].dt.month23 df['amount_category'] = pd.cut(24 df['amount'],25 bins=[0, 100, 500, float('inf')],26 labels=['small', 'medium', 'large']27 )28 summary = df.groupby(['year', 'month']).agg({29 'amount': ['sum', 'mean', 'count']30 }).reset_index()31 summary.columns = ['year', 'month', 'total_amount', 'avg_amount', 'order_count']32 print(f"Transformed to {len(summary)} rows")33 return summary34 35 def load(self, df, table_name):36 """Load data to target"""37 print(f"Loading to {table_name}...")38 df['loaded_at'] = datetime.now()39 df.to_sql(table_name, self.target_engine, if_exists='replace', index=False)40 print(f"Loaded {len(df)} rows")41 42 def run(self, query, target_table):43 """Run complete ETL"""44 df = self.extract(query)45 transformed = self.transform(df)46 self.load(transformed, target_table)47 return transformed4849# Usage50pipeline = ETLPipeline(51 source_conn='postgresql://user:pass@source_host/db',52 target_conn='postgresql://user:pass@target_host/datawarehouse'53)54result = pipeline.run(55 query="SELECT * FROM orders WHERE order_date >= '2024-01-01'",56 target_table='monthly_sales_summary'57)Analysis Class
1class SalesAnalyzer:2 def __init__(self, connection_string):3 self.engine = create_engine(connection_string)4 5 def get_top_customers(self, n=10, start_date=None):6 query = """7 SELECT c.id, c.name,8 COUNT(o.id) as order_count,9 SUM(o.amount) as total_spent,10 AVG(o.amount) as avg_order11 FROM customers c12 JOIN orders o ON c.id = o.customer_id13 WHERE 1=114 """15 if start_date:16 query += f" AND o.order_date >= '{start_date}'"17 query += f" GROUP BY c.id, c.name ORDER BY total_spent DESC LIMIT {n}"18 return pd.read_sql(query, self.engine)19 20 def get_monthly_trends(self, year):21 query = f"""22 SELECT EXTRACT(MONTH FROM order_date) as month,23 COUNT(*) as orders,24 SUM(amount) as revenue,25 AVG(amount) as avg_order26 FROM orders27 WHERE EXTRACT(YEAR FROM order_date) = {year}28 GROUP BY EXTRACT(MONTH FROM order_date)29 ORDER BY month30 """31 return pd.read_sql(query, self.engine)3233# Usage34analyzer = SalesAnalyzer('postgresql://user:pass@host/db')35top_customers = analyzer.get_top_customers(n=20, start_date='2024-01-01')36monthly = analyzer.get_monthly_trends(2024)Parameterized Reports with Jinja2
1from jinja2 import Template23report_template = """4SELECT 5 {{ group_by_column }},6 COUNT(*) as count,7 SUM(amount) as total,8 AVG(amount) as average9FROM orders10WHERE order_date BETWEEN '{{ start_date }}' AND '{{ end_date }}'11{% if status %}AND status = '{{ status }}'{% endif %}12GROUP BY {{ group_by_column }}13ORDER BY total DESC14{% if limit %}LIMIT {{ limit }}{% endif %}15"""1617query = Template(report_template).render(18 group_by_column='customer_id',19 start_date='2024-01-01',20 end_date='2024-12-31',21 status='completed',22 limit=10023)24df = pd.read_sql(query, engine)Checkpoint
ETL Pipeline pattern: Extract (read_sql) → Transform (Pandas) → Load (to_sql). Analysis classes encapsulate business logic. Jinja2 templates cho dynamic report queries. ETL pipeline là foundation của data engineering — bạn có thể extend class để thêm error handling và logging!
✅ Best Practices & ORM
Connection Management
1from sqlalchemy import create_engine2from sqlalchemy.orm import sessionmaker34engine = create_engine(connection_string, pool_pre_ping=True)5Session = sessionmaker(bind=engine)67# For queries — use context managers8with engine.connect() as conn:9 result = conn.execute("SELECT * FROM table")1011# For ORM operations12with Session() as session:13 session.query(Model).filter(...)Error Handling
1from sqlalchemy.exc import SQLAlchemyError23def safe_query(query, engine):4 """Execute query with error handling"""5 try:6 df = pd.read_sql(query, engine)7 return df8 except SQLAlchemyError as e:9 print(f"Database error: {e}")10 return None11 except Exception as e:12 print(f"Unexpected error: {e}")13 return NoneSQLAlchemy ORM Basics
1from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey2from sqlalchemy.ext.declarative import declarative_base3from sqlalchemy.orm import relationship45Base = declarative_base()67class Customer(Base):8 __tablename__ = 'customers'9 id = Column(Integer, primary_key=True)10 name = Column(String(100))11 email = Column(String(100), unique=True)12 orders = relationship('Order', back_populates='customer')1314class Order(Base):15 __tablename__ = 'orders'16 id = Column(Integer, primary_key=True)17 customer_id = Column(Integer, ForeignKey('customers.id'))18 amount = Column(Float)19 order_date = Column(DateTime)20 customer = relationship('Customer', back_populates='orders')2122# Create tables23Base.metadata.create_all(engine)2425# Query with ORM → DataFrame26Session = sessionmaker(bind=engine)27session = Session()28df = pd.read_sql(session.query(Customer).statement, engine)29session.close()Query Logging
1import logging23# Enable SQLAlchemy logging4logging.basicConfig()5logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)67# Or create engine with echo8engine = create_engine(connection_string, echo=True)Checkpoint
Luôn dùng context managers cho connections. Error handling với try/except SQLAlchemyError. ORM map Python classes → DB tables. Logging giúp debug SQL queries. pool_pre_ping=True rất quan trọng trong production — nó tự kiểm tra connection còn sống không trước khi dùng!
📋 Tổng kết
Kiến thức đã học
| Chủ đề | Nội dung chính |
|---|---|
| Connection | SQLAlchemy engine, environment variables, context manager |
| Reading | pd.read_sql(), chunked reading, parameters, parse_dates |
| Writing | to_sql(), if_exists, bulk insert, method='multi' |
| Workflow | ETL pipelines, analysis classes, Jinja2 templates |
| ORM | declarative_base, models, relationships, session |
| Best Practices | Connection pooling, error handling, logging |
Key Patterns
- ✅ SQLAlchemy là recommended way kết nối databases
- ✅ Environment variables cho credentials — NEVER hardcode
- ✅ Chunked reading cho large datasets (>100K rows)
- ✅ method='multi' + chunksize cho efficient bulk insert
- ✅ Context managers đảm bảo close connections properly
- ✅ Parameters cho SQL injection safety
Câu hỏi tự kiểm tra
- SQLAlchemy vs raw connection string: tại sao dùng SQLAlchemy?
- Chunked reading dùng khi nào?
method='multi'cải thiện performance thế nào?- Environment variables bảo vệ credentials thế nào?
Bài tiếp theo: Time Series Analysis →
🎉 Tuyệt với! Bạn đã biết cách kết nối SQL và Python!
Nhớ: SQLAlchemy + Pandas = cặp đôi hoàn hảo. SQL để query, Python để analyze. Kết hợp cả hai cho workflow tối ưu!
