MinAI - Về trang chủ
Hướng dẫn
6/132.5 giờ
Đang tải...

SQL + Python Integration

Kết nối, query, và phân tích data từ databases trong Python

0

🎯 Mục tiêu bài học

TB5 min
Sau bài học này, bạn sẽ:
  • Kết nối Python với databases bằng SQLAlchemy
  • Đọc data từ SQL vào Pandas DataFrame (pd.read_sql)
  • Ghi data từ DataFrame vào database (to_sql, bulk insert)
  • Xây dựng ETL pipelinesanalysis modules hoàn chỉnh
📋 Thông tin bài học
Thông tinChi tiết
⏱️ Thời lượng2.5 giờ
📖 Chủ đề chínhSQLAlchemy, pd.read_sql, to_sql, ETL
💡 Kiến thức cần cóSQL cơ bản, Pandas fundamentals
🎯 OutputXây dựng analysis module kết nối database
1

📖 Thuật ngữ quan trọng

TB5 min
Thuật ngữTiếng ViệtMô tả
SQLAlchemyThư viện kết nối DBPython SQL toolkit và ORM — cách chuẩn kết nối databases
EngineĐối tượng kết nốiConnection object quản lý database connections
Connection StringChuỗi kết nốiURI chứa thông tin kết nối (user, password, host, database)
pd.read_sqlĐọc SQL → DataFrameHàm Pandas đọc query results thành DataFrame
to_sqlGhi DataFrame → DBMethod ghi DataFrame vào database table
Chunked ReadingĐọc theo phầnĐọc dữ liệu lớn theo từng batch (chunksize)
ORMÁnh xạ đối tượng-quan hệObject-Relational Mapping — map Python classes to DB tables
ETLTrích xuất-Biến đổi-NạpExtract-Transform-Load — data pipeline pattern
Connection PoolNhóm kết nốiTái sử dụng database connections cho performance
Context ManagerQuản lý ngữ cảnhPattern with/try-finally đảm bảo đóng kết nối đúng cách

Checkpoint

SQLAlchemy là cách chuẩn kết nối Python với databases. pd.read_sql đọc SQL → DataFrame, to_sql ghi ngược lại. ETL là pattern pipeline phổ biến nhất. SQL excel ở querying, Python excel ở analysis — kết hợp cả hai cho workflow analytics mạnh nhất!

2

🔌 Connection Methods

TB5 min
Sức mạnh kết hợp SQL + Python

SQL excel ở querying và aggregating data. Python excel ở analysis, visualization, và ML. Kết hợp cả hai cho workflow mạnh mẽ nhất.

SQL + Python Integration

🗄️Database (PostgreSQL/MySQL/SQLite)
🔗SQLAlchemy (Engine)
📊Pandas DataFrame
📈Analysis / Viz / ML

SQLAlchemy Engine

Python
1from sqlalchemy import create_engine
2import pandas as pd
3
4# PostgreSQL
5engine = create_engine('postgresql://user:password@host:5432/database')
6
7# MySQL
8engine = create_engine('mysql+pymysql://user:password@host:3306/database')
9
10# SQLite
11engine = create_engine('sqlite:///my_database.db')
12
13# SQL Server
14engine = create_engine(
15 'mssql+pyodbc://user:password@host/database'
16 '?driver=ODBC+Driver+17+for+SQL+Server'
17)
18
19# Connection with options
20engine = create_engine(
21 'postgresql://user:password@host:5432/database',
22 echo=False, # Don't log queries
23 pool_size=5, # Connection pool size
24 pool_recycle=3600, # Recycle connections after 1 hour
25 connect_args={'connect_timeout': 10}
26)

Environment Variables (Bảo mật)

Python
1import os
2from sqlalchemy import create_engine
3from dotenv import load_dotenv
4
5load_dotenv()
6
7DB_USER = os.getenv('DB_USER')
8DB_PASS = os.getenv('DB_PASS')
9DB_HOST = os.getenv('DB_HOST')
10DB_NAME = os.getenv('DB_NAME')
11
12connection_string = f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:5432/{DB_NAME}'
13engine = create_engine(connection_string)
14
15# .env file example:
16# DB_USER=myuser
17# DB_PASS=mypassword
18# DB_HOST=localhost
19# DB_NAME=mydatabase

Context Manager Pattern

Python
1from sqlalchemy import create_engine
2from contextlib import contextmanager
3
4@contextmanager
5def get_db_connection():
6 """Context manager for database connections"""
7 engine = create_engine('postgresql://user:pass@host/db')
8 connection = engine.connect()
9 try:
10 yield connection
11 finally:
12 connection.close()
13
14# Usage
15with get_db_connection() as conn:
16 result = conn.execute("SELECT * FROM orders LIMIT 10")
17 for row in result:
18 print(row)

Checkpoint

SQLAlchemy create_engine() hỗ trợ PostgreSQL, MySQL, SQLite, SQL Server. Luôn dùng environment variables cho credentials, context manager đảm bảo đóng connection. Bạn đang dùng database nào? Connection string sẽ khác nhau tùy hệ quản trị!

3

📥 Reading Data — pd.read_sql()

TB5 min

Basic Query

Python
1import pandas as pd
2from sqlalchemy import create_engine
3
4engine = create_engine('postgresql://user:pass@host/database')
5
6# Basic query
7df = pd.read_sql("SELECT * FROM orders", engine)
8
9# With parameters (safe from SQL injection)
10customer_id = 100
11df = pd.read_sql(
12 "SELECT * FROM orders WHERE customer_id = %(cid)s",
13 engine,
14 params={'cid': customer_id}
15)
16
17# Complex query with JOINs
18query = """
19 SELECT
20 o.order_id, o.order_date, o.amount,
21 c.name as customer_name
22 FROM orders o
23 JOIN customers c ON o.customer_id = c.id
24 WHERE o.order_date >= '2024-01-01'
25 ORDER BY o.order_date DESC
26"""
27df = pd.read_sql(query, engine)
28print(f"Shape: {df.shape}")
29print(df.head())

Chunked Reading (Large Datasets)

Python
1# Read in chunks to manage memory
2chunk_size = 100000
3chunks = []
4
5for chunk in pd.read_sql(
6 "SELECT * FROM large_table",
7 engine,
8 chunksize=chunk_size
9):
10 processed = chunk.query('amount > 0')
11 chunks.append(processed)
12 print(f"Processed chunk with {len(chunk)} rows")
13
14df = pd.concat(chunks, ignore_index=True)
15
16# Process without storing all chunks
17total_amount = 0
18for chunk in pd.read_sql("SELECT amount FROM orders", engine, chunksize=10000):
19 total_amount += chunk['amount'].sum()
20print(f"Total: {total_amount}")

Parse Dates and Index

Python
1# Parse date columns automatically
2df = pd.read_sql(
3 "SELECT * FROM orders",
4 engine,
5 parse_dates=['order_date', 'ship_date']
6)
7
8# Set index column
9df = pd.read_sql(
10 "SELECT * FROM orders",
11 engine,
12 index_col='order_id'
13)
14
15# Combined options
16df = pd.read_sql(
17 "SELECT * FROM orders WHERE order_date >= '2024-01-01'",
18 engine,
19 parse_dates={'order_date': '%Y-%m-%d'},
20 index_col='order_id',
21 coerce_float=True
22)

Checkpoint

pd.read_sql() đọc SQL query → DataFrame. Dùng params cho SQL injection safety. Chunked reading cho large datasets. parse_dates tự convert dates. Tip: Chunksize rất quan trọng khi làm việc với tables hàng triệu rows — bạn đã thử chưa?

4

📤 Writing Data — to_sql()

TB5 min

Basic Write

Python
1# Create sample DataFrame
2df = pd.DataFrame({
3 'name': ['Product A', 'Product B', 'Product C'],
4 'price': [100.0, 200.0, 150.0],
5 'category': ['Electronics', 'Clothing', 'Electronics']
6})
7
8# Write to database
9df.to_sql(
10 'products', # Table name
11 engine, # Connection
12 if_exists='replace', # 'fail', 'replace', 'append'
13 index=False # Don't write index as column
14)
15
16# Append new data
17new_products = pd.DataFrame({
18 'name': ['Product D'],
19 'price': [300.0],
20 'category': ['Home']
21})
22new_products.to_sql('products', engine, if_exists='append', index=False)

Write with Data Types

Python
1from sqlalchemy import Integer, String, Float, DateTime
2
3df.to_sql(
4 'orders',
5 engine,
6 if_exists='replace',
7 index=False,
8 dtype={
9 'order_id': Integer,
10 'customer_name': String(100),
11 'amount': Float,
12 'order_date': DateTime
13 }
14)

Efficient Bulk Insert

Python
1# For large DataFrames, use method='multi' or chunks
2df.to_sql(
3 'large_table',
4 engine,
5 if_exists='append',
6 index=False,
7 method='multi', # Insert multiple rows at once
8 chunksize=1000 # Commit every 1000 rows
9)
10
11# Using COPY for PostgreSQL (fastest)
12from io import StringIO
13
14def psql_insert_copy(table, conn, keys, data_iter):
15 """Use PostgreSQL COPY for fast bulk insert"""
16 dbapi_conn = conn.connection
17 with dbapi_conn.cursor() as cur:
18 s_buf = StringIO()
19 for row in data_iter:
20 s_buf.write('\t'.join(map(str, row)) + '\n')
21 s_buf.seek(0)
22 cur.copy_from(s_buf, table.name, columns=keys, null='')
23
24df.to_sql('table', engine, method=psql_insert_copy, index=False)

Checkpoint

to_sql() ghi DataFrame → database. if_exists controls behavior (fail/replace/append). method='multi' + chunksize cho bulk insert hiệu quả. Replace vs Append — bạn chọn cái nào khi update data warehouse hàng ngày?

5

🔄 Practical Workflows — ETL & Analysis

TB5 min

ETL Pipeline Class

Python
1import pandas as pd
2from sqlalchemy import create_engine
3from datetime import datetime
4
5class ETLPipeline:
6 def __init__(self, source_conn, target_conn):
7 self.source_engine = create_engine(source_conn)
8 self.target_engine = create_engine(target_conn)
9
10 def extract(self, query):
11 """Extract data from source"""
12 print(f"Extracting data...")
13 df = pd.read_sql(query, self.source_engine)
14 print(f"Extracted {len(df)} rows")
15 return df
16
17 def transform(self, df):
18 """Transform data"""
19 print("Transforming data...")
20 df = df.dropna(subset=['amount'])
21 df['year'] = df['order_date'].dt.year
22 df['month'] = df['order_date'].dt.month
23 df['amount_category'] = pd.cut(
24 df['amount'],
25 bins=[0, 100, 500, float('inf')],
26 labels=['small', 'medium', 'large']
27 )
28 summary = df.groupby(['year', 'month']).agg({
29 'amount': ['sum', 'mean', 'count']
30 }).reset_index()
31 summary.columns = ['year', 'month', 'total_amount', 'avg_amount', 'order_count']
32 print(f"Transformed to {len(summary)} rows")
33 return summary
34
35 def load(self, df, table_name):
36 """Load data to target"""
37 print(f"Loading to {table_name}...")
38 df['loaded_at'] = datetime.now()
39 df.to_sql(table_name, self.target_engine, if_exists='replace', index=False)
40 print(f"Loaded {len(df)} rows")
41
42 def run(self, query, target_table):
43 """Run complete ETL"""
44 df = self.extract(query)
45 transformed = self.transform(df)
46 self.load(transformed, target_table)
47 return transformed
48
49# Usage
50pipeline = ETLPipeline(
51 source_conn='postgresql://user:pass@source_host/db',
52 target_conn='postgresql://user:pass@target_host/datawarehouse'
53)
54result = pipeline.run(
55 query="SELECT * FROM orders WHERE order_date >= '2024-01-01'",
56 target_table='monthly_sales_summary'
57)

Analysis Class

Python
1class SalesAnalyzer:
2 def __init__(self, connection_string):
3 self.engine = create_engine(connection_string)
4
5 def get_top_customers(self, n=10, start_date=None):
6 query = """
7 SELECT c.id, c.name,
8 COUNT(o.id) as order_count,
9 SUM(o.amount) as total_spent,
10 AVG(o.amount) as avg_order
11 FROM customers c
12 JOIN orders o ON c.id = o.customer_id
13 WHERE 1=1
14 """
15 if start_date:
16 query += f" AND o.order_date >= '{start_date}'"
17 query += f" GROUP BY c.id, c.name ORDER BY total_spent DESC LIMIT {n}"
18 return pd.read_sql(query, self.engine)
19
20 def get_monthly_trends(self, year):
21 query = f"""
22 SELECT EXTRACT(MONTH FROM order_date) as month,
23 COUNT(*) as orders,
24 SUM(amount) as revenue,
25 AVG(amount) as avg_order
26 FROM orders
27 WHERE EXTRACT(YEAR FROM order_date) = {year}
28 GROUP BY EXTRACT(MONTH FROM order_date)
29 ORDER BY month
30 """
31 return pd.read_sql(query, self.engine)
32
33# Usage
34analyzer = SalesAnalyzer('postgresql://user:pass@host/db')
35top_customers = analyzer.get_top_customers(n=20, start_date='2024-01-01')
36monthly = analyzer.get_monthly_trends(2024)

Parameterized Reports with Jinja2

Python
1from jinja2 import Template
2
3report_template = """
4SELECT
5 {{ group_by_column }},
6 COUNT(*) as count,
7 SUM(amount) as total,
8 AVG(amount) as average
9FROM orders
10WHERE order_date BETWEEN '{{ start_date }}' AND '{{ end_date }}'
11{% if status %}AND status = '{{ status }}'{% endif %}
12GROUP BY {{ group_by_column }}
13ORDER BY total DESC
14{% if limit %}LIMIT {{ limit }}{% endif %}
15"""
16
17query = Template(report_template).render(
18 group_by_column='customer_id',
19 start_date='2024-01-01',
20 end_date='2024-12-31',
21 status='completed',
22 limit=100
23)
24df = pd.read_sql(query, engine)

Checkpoint

ETL Pipeline pattern: Extract (read_sql) → Transform (Pandas) → Load (to_sql). Analysis classes encapsulate business logic. Jinja2 templates cho dynamic report queries. ETL pipeline là foundation của data engineering — bạn có thể extend class để thêm error handling và logging!

6

✅ Best Practices & ORM

TB5 min

Connection Management

Python
1from sqlalchemy import create_engine
2from sqlalchemy.orm import sessionmaker
3
4engine = create_engine(connection_string, pool_pre_ping=True)
5Session = sessionmaker(bind=engine)
6
7# For queries — use context managers
8with engine.connect() as conn:
9 result = conn.execute("SELECT * FROM table")
10
11# For ORM operations
12with Session() as session:
13 session.query(Model).filter(...)

Error Handling

Python
1from sqlalchemy.exc import SQLAlchemyError
2
3def safe_query(query, engine):
4 """Execute query with error handling"""
5 try:
6 df = pd.read_sql(query, engine)
7 return df
8 except SQLAlchemyError as e:
9 print(f"Database error: {e}")
10 return None
11 except Exception as e:
12 print(f"Unexpected error: {e}")
13 return None

SQLAlchemy ORM Basics

Python
1from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey
2from sqlalchemy.ext.declarative import declarative_base
3from sqlalchemy.orm import relationship
4
5Base = declarative_base()
6
7class Customer(Base):
8 __tablename__ = 'customers'
9 id = Column(Integer, primary_key=True)
10 name = Column(String(100))
11 email = Column(String(100), unique=True)
12 orders = relationship('Order', back_populates='customer')
13
14class Order(Base):
15 __tablename__ = 'orders'
16 id = Column(Integer, primary_key=True)
17 customer_id = Column(Integer, ForeignKey('customers.id'))
18 amount = Column(Float)
19 order_date = Column(DateTime)
20 customer = relationship('Customer', back_populates='orders')
21
22# Create tables
23Base.metadata.create_all(engine)
24
25# Query with ORM → DataFrame
26Session = sessionmaker(bind=engine)
27session = Session()
28df = pd.read_sql(session.query(Customer).statement, engine)
29session.close()

Query Logging

Python
1import logging
2
3# Enable SQLAlchemy logging
4logging.basicConfig()
5logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
6
7# Or create engine with echo
8engine = create_engine(connection_string, echo=True)

Checkpoint

Luôn dùng context managers cho connections. Error handling với try/except SQLAlchemyError. ORM map Python classes → DB tables. Logging giúp debug SQL queries. pool_pre_ping=True rất quan trọng trong production — nó tự kiểm tra connection còn sống không trước khi dùng!

7

📋 Tổng kết

TB5 min

Kiến thức đã học

Chủ đềNội dung chính
ConnectionSQLAlchemy engine, environment variables, context manager
Readingpd.read_sql(), chunked reading, parameters, parse_dates
Writingto_sql(), if_exists, bulk insert, method='multi'
WorkflowETL pipelines, analysis classes, Jinja2 templates
ORMdeclarative_base, models, relationships, session
Best PracticesConnection pooling, error handling, logging

Key Patterns

  1. SQLAlchemy là recommended way kết nối databases
  2. Environment variables cho credentials — NEVER hardcode
  3. Chunked reading cho large datasets (>100K rows)
  4. method='multi' + chunksize cho efficient bulk insert
  5. Context managers đảm bảo close connections properly
  6. Parameters cho SQL injection safety

Câu hỏi tự kiểm tra

  1. SQLAlchemy vs raw connection string: tại sao dùng SQLAlchemy?
  2. Chunked reading dùng khi nào?
  3. method='multi' cải thiện performance thế nào?
  4. Environment variables bảo vệ credentials thế nào?

Bài tiếp theo: Time Series Analysis →

🎉 Tuyệt với! Bạn đã biết cách kết nối SQL và Python!

Nhớ: SQLAlchemy + Pandas = cặp đôi hoàn hảo. SQL để query, Python để analyze. Kết hợp cả hai cho workflow tối ưu!