Lý thuyết
Bài 8/17

SQL + Python Integration

Kết nối, query, và phân tích data từ databases trong Python

SQL + Python Integration

SQL and Python Data Integration

1. Introduction

Sức mạnh kết hợp SQL + Python

SQL excel ở querying và aggregating data. Python excel ở analysis, visualization, và ML. Kết hợp cả hai cho workflow mạnh mẽ nhất.

1.1 Workflow Overview

Text
1┌─────────────────────────────────────────────────────────┐
2│ SQL + Python Integration │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ ┌──────────────┐ ┌──────────────┐ │
6│ │ Database │───▶│ SQLAlchemy │ │
7│ │ PostgreSQL │ │ /psycopg2 │ │
8│ │ MySQL │ └──────┬───────┘ │
9│ │ SQLite │ │ │
10│ └──────────────┘ v │
11│ ┌──────────────┐ │
12│ │ Pandas │ │
13│ │ DataFrame │ │
14│ └──────┬───────┘ │
15│ │ │
16│ ┌───────────────────┼───────────────────┐ │
17│ v v v │
18│ ┌──────────────┐ ┌──────────────┐ ┌──────────┐ │
19│ │ Analysis │ │Visualization │ │ ML │ │
20│ │ Pandas │ │ Matplotlib │ │ Sklearn │ │
21│ └──────────────┘ └──────────────┘ └──────────┘ │
22│ │
23└─────────────────────────────────────────────────────────┘

2. Connection Methods

2.1 SQLAlchemy (Recommended)

Python
1from sqlalchemy import create_engine
2import pandas as pd
3
4# PostgreSQL
5engine = create_engine('postgresql://user:password@host:5432/database')
6
7# MySQL
8engine = create_engine('mysql+pymysql://user:password@host:3306/database')
9
10# SQLite
11engine = create_engine('sqlite:///my_database.db')
12
13# SQL Server
14engine = create_engine('mssql+pyodbc://user:password@host/database?driver=ODBC+Driver+17+for+SQL+Server')
15
16# Connection string with options
17engine = create_engine(
18 'postgresql://user:password@host:5432/database',
19 echo=False, # Don't log queries
20 pool_size=5, # Connection pool size
21 pool_recycle=3600, # Recycle connections after 1 hour
22 connect_args={'connect_timeout': 10}
23)

2.2 Using Environment Variables

Python
1import os
2from sqlalchemy import create_engine
3from dotenv import load_dotenv
4
5# Load .env file
6load_dotenv()
7
8# Get credentials from environment
9DB_USER = os.getenv('DB_USER')
10DB_PASS = os.getenv('DB_PASS')
11DB_HOST = os.getenv('DB_HOST')
12DB_NAME = os.getenv('DB_NAME')
13
14# Create connection string
15connection_string = f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:5432/{DB_NAME}'
16engine = create_engine(connection_string)
17
18# .env file example:
19# DB_USER=myuser
20# DB_PASS=mypassword
21# DB_HOST=localhost
22# DB_NAME=mydatabase

2.3 Context Manager Pattern

Python
1from sqlalchemy import create_engine
2from contextlib import contextmanager
3
4@contextmanager
5def get_db_connection():
6 """Context manager for database connections"""
7 engine = create_engine('postgresql://user:pass@host/db')
8 connection = engine.connect()
9 try:
10 yield connection
11 finally:
12 connection.close()
13
14# Usage
15with get_db_connection() as conn:
16 result = conn.execute("SELECT * FROM orders LIMIT 10")
17 for row in result:
18 print(row)

3. Reading Data with Pandas

3.1 pd.read_sql()

Python
1import pandas as pd
2from sqlalchemy import create_engine
3
4engine = create_engine('postgresql://user:pass@host/database')
5
6# Basic query
7df = pd.read_sql("SELECT * FROM orders", engine)
8
9# With parameters (safe from SQL injection)
10customer_id = 100
11df = pd.read_sql(
12 "SELECT * FROM orders WHERE customer_id = %(cid)s",
13 engine,
14 params={'cid': customer_id}
15)
16
17# Read entire table
18df = pd.read_sql_table('orders', engine)
19
20# Complex query
21query = """
22 SELECT
23 o.order_id,
24 o.order_date,
25 o.amount,
26 c.name as customer_name
27 FROM orders o
28 JOIN customers c ON o.customer_id = c.id
29 WHERE o.order_date >= '2024-01-01'
30 ORDER BY o.order_date DESC
31"""
32df = pd.read_sql(query, engine)
33
34print(f"Shape: {df.shape}")
35print(df.head())

3.2 Chunked Reading

Python
1# For large datasets, read in chunks
2chunk_size = 100000
3chunks = []
4
5for chunk in pd.read_sql(
6 "SELECT * FROM large_table",
7 engine,
8 chunksize=chunk_size
9):
10 # Process each chunk
11 processed = chunk.query('amount > 0')
12 chunks.append(processed)
13 print(f"Processed chunk with {len(chunk)} rows")
14
15df = pd.concat(chunks, ignore_index=True)
16
17# Or process without storing all chunks
18total_amount = 0
19for chunk in pd.read_sql("SELECT amount FROM orders", engine, chunksize=10000):
20 total_amount += chunk['amount'].sum()
21print(f"Total: {total_amount}")

3.3 Parse Dates and Index

Python
1# Parse date columns
2df = pd.read_sql(
3 "SELECT * FROM orders",
4 engine,
5 parse_dates=['order_date', 'ship_date']
6)
7
8# Set index column
9df = pd.read_sql(
10 "SELECT * FROM orders",
11 engine,
12 index_col='order_id'
13)
14
15# Multiple options
16df = pd.read_sql(
17 "SELECT * FROM orders WHERE order_date >= '2024-01-01'",
18 engine,
19 parse_dates={'order_date': '%Y-%m-%d'},
20 index_col='order_id',
21 coerce_float=True
22)

4. Writing Data to Database

4.1 pd.to_sql()

Python
1# Create sample DataFrame
2df = pd.DataFrame({
3 'name': ['Product A', 'Product B', 'Product C'],
4 'price': [100.0, 200.0, 150.0],
5 'category': ['Electronics', 'Clothing', 'Electronics']
6})
7
8# Write to database
9df.to_sql(
10 'products', # Table name
11 engine, # Connection
12 if_exists='replace', # 'fail', 'replace', 'append'
13 index=False # Don't write index as column
14)
15
16# Append new data
17new_products = pd.DataFrame({
18 'name': ['Product D'],
19 'price': [300.0],
20 'category': ['Home']
21})
22new_products.to_sql('products', engine, if_exists='append', index=False)

4.2 Write with Data Types

Python
1from sqlalchemy import Integer, String, Float, DateTime
2
3# Specify SQL data types
4df.to_sql(
5 'orders',
6 engine,
7 if_exists='replace',
8 index=False,
9 dtype={
10 'order_id': Integer,
11 'customer_name': String(100),
12 'amount': Float,
13 'order_date': DateTime
14 }
15)

4.3 Efficient Bulk Insert

Python
1# For large DataFrames, use method='multi' or chunks
2df.to_sql(
3 'large_table',
4 engine,
5 if_exists='append',
6 index=False,
7 method='multi', # Insert multiple rows at once
8 chunksize=1000 # Commit every 1000 rows
9)
10
11# Using COPY for PostgreSQL (fastest)
12from io import StringIO
13
14def psql_insert_copy(table, conn, keys, data_iter):
15 """Use PostgreSQL COPY for fast bulk insert"""
16 dbapi_conn = conn.connection
17 with dbapi_conn.cursor() as cur:
18 s_buf = StringIO()
19 for row in data_iter:
20 s_buf.write('\t'.join(map(str, row)) + '\n')
21 s_buf.seek(0)
22 cur.copy_from(s_buf, table.name, columns=keys, null='')
23
24df.to_sql('table', engine, method=psql_insert_copy, index=False)

5. SQLAlchemy ORM Basics

5.1 Define Models

Python
1from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey
2from sqlalchemy.ext.declarative import declarative_base
3from sqlalchemy.orm import relationship
4
5Base = declarative_base()
6
7class Customer(Base):
8 __tablename__ = 'customers'
9
10 id = Column(Integer, primary_key=True)
11 name = Column(String(100))
12 email = Column(String(100), unique=True)
13
14 orders = relationship('Order', back_populates='customer')
15
16class Order(Base):
17 __tablename__ = 'orders'
18
19 id = Column(Integer, primary_key=True)
20 customer_id = Column(Integer, ForeignKey('customers.id'))
21 amount = Column(Float)
22 order_date = Column(DateTime)
23
24 customer = relationship('Customer', back_populates='orders')
25
26# Create tables
27Base.metadata.create_all(engine)

5.2 Query with ORM

Python
1from sqlalchemy.orm import sessionmaker
2
3Session = sessionmaker(bind=engine)
4session = Session()
5
6# Query all
7customers = session.query(Customer).all()
8
9# Filter
10big_orders = session.query(Order).filter(Order.amount > 1000).all()
11
12# Join
13result = session.query(Customer, Order)\
14 .join(Order)\
15 .filter(Order.amount > 100)\
16 .all()
17
18# Convert to DataFrame
19df = pd.read_sql(
20 session.query(Customer).statement,
21 engine
22)
23
24session.close()

6. Practical Workflows

6.1 ETL Pipeline

Python
1import pandas as pd
2from sqlalchemy import create_engine
3from datetime import datetime
4
5class ETLPipeline:
6 def __init__(self, source_conn, target_conn):
7 self.source_engine = create_engine(source_conn)
8 self.target_engine = create_engine(target_conn)
9
10 def extract(self, query):
11 """Extract data from source"""
12 print(f"Extracting data...")
13 df = pd.read_sql(query, self.source_engine)
14 print(f"Extracted {len(df)} rows")
15 return df
16
17 def transform(self, df):
18 """Transform data"""
19 print("Transforming data...")
20
21 # Clean
22 df = df.dropna(subset=['amount'])
23
24 # Add calculated columns
25 df['year'] = df['order_date'].dt.year
26 df['month'] = df['order_date'].dt.month
27 df['amount_category'] = pd.cut(
28 df['amount'],
29 bins=[0, 100, 500, float('inf')],
30 labels=['small', 'medium', 'large']
31 )
32
33 # Aggregate
34 summary = df.groupby(['year', 'month']).agg({
35 'amount': ['sum', 'mean', 'count']
36 }).reset_index()
37 summary.columns = ['year', 'month', 'total_amount', 'avg_amount', 'order_count']
38
39 print(f"Transformed to {len(summary)} rows")
40 return summary
41
42 def load(self, df, table_name):
43 """Load data to target"""
44 print(f"Loading to {table_name}...")
45 df['loaded_at'] = datetime.now()
46 df.to_sql(
47 table_name,
48 self.target_engine,
49 if_exists='replace',
50 index=False
51 )
52 print(f"Loaded {len(df)} rows")
53
54 def run(self, query, target_table):
55 """Run complete ETL"""
56 df = self.extract(query)
57 transformed = self.transform(df)
58 self.load(transformed, target_table)
59 return transformed
60
61
62# Usage
63pipeline = ETLPipeline(
64 source_conn='postgresql://user:pass@source_host/db',
65 target_conn='postgresql://user:pass@target_host/datawarehouse'
66)
67
68result = pipeline.run(
69 query="SELECT * FROM orders WHERE order_date >= '2024-01-01'",
70 target_table='monthly_sales_summary'
71)

6.2 Analysis Class

Python
1class SalesAnalyzer:
2 def __init__(self, connection_string):
3 self.engine = create_engine(connection_string)
4
5 def get_top_customers(self, n=10, start_date=None):
6 """Get top N customers by revenue"""
7 query = """
8 SELECT
9 c.id as customer_id,
10 c.name,
11 COUNT(o.id) as order_count,
12 SUM(o.amount) as total_spent,
13 AVG(o.amount) as avg_order
14 FROM customers c
15 JOIN orders o ON c.id = o.customer_id
16 WHERE 1=1
17 """
18 if start_date:
19 query += f" AND o.order_date >= '{start_date}'"
20
21 query += f"""
22 GROUP BY c.id, c.name
23 ORDER BY total_spent DESC
24 LIMIT {n}
25 """
26 return pd.read_sql(query, self.engine)
27
28 def get_monthly_trends(self, year):
29 """Get monthly sales trends"""
30 query = f"""
31 SELECT
32 EXTRACT(MONTH FROM order_date) as month,
33 COUNT(*) as orders,
34 SUM(amount) as revenue,
35 AVG(amount) as avg_order
36 FROM orders
37 WHERE EXTRACT(YEAR FROM order_date) = {year}
38 GROUP BY EXTRACT(MONTH FROM order_date)
39 ORDER BY month
40 """
41 return pd.read_sql(query, self.engine)
42
43 def get_product_performance(self, category=None):
44 """Analyze product performance"""
45 query = """
46 SELECT
47 p.category,
48 p.name,
49 SUM(oi.quantity) as units_sold,
50 SUM(oi.quantity * oi.unit_price) as revenue
51 FROM order_items oi
52 JOIN products p ON oi.product_id = p.id
53 """
54 if category:
55 query += f" WHERE p.category = '{category}'"
56
57 query += """
58 GROUP BY p.category, p.name
59 ORDER BY revenue DESC
60 """
61 return pd.read_sql(query, self.engine)
62
63
64# Usage
65analyzer = SalesAnalyzer('postgresql://user:pass@host/db')
66
67top_customers = analyzer.get_top_customers(n=20, start_date='2024-01-01')
68monthly = analyzer.get_monthly_trends(2024)
69products = analyzer.get_product_performance(category='Electronics')

6.3 Parameterized Reports

Python
1from jinja2 import Template
2
3def generate_report_query(template, **params):
4 """Generate SQL from template"""
5 sql_template = Template(template)
6 return sql_template.render(**params)
7
8# Template
9report_template = """
10SELECT
11 {{ group_by_column }},
12 COUNT(*) as count,
13 SUM(amount) as total,
14 AVG(amount) as average
15FROM orders
16WHERE order_date BETWEEN '{{ start_date }}' AND '{{ end_date }}'
17{% if status %}
18 AND status = '{{ status }}'
19{% endif %}
20GROUP BY {{ group_by_column }}
21ORDER BY total DESC
22{% if limit %}
23LIMIT {{ limit }}
24{% endif %}
25"""
26
27# Generate query
28query = generate_report_query(
29 report_template,
30 group_by_column='customer_id',
31 start_date='2024-01-01',
32 end_date='2024-12-31',
33 status='completed',
34 limit=100
35)
36
37df = pd.read_sql(query, engine)

7. Best Practices

7.1 Connection Management

Python
1# Use context managers
2from sqlalchemy import create_engine
3from sqlalchemy.orm import sessionmaker
4
5engine = create_engine(connection_string, pool_pre_ping=True)
6Session = sessionmaker(bind=engine)
7
8# For queries
9with engine.connect() as conn:
10 result = conn.execute("SELECT * FROM table")
11
12# For ORM operations
13with Session() as session:
14 session.query(Model).filter(...)

7.2 Error Handling

Python
1from sqlalchemy.exc import SQLAlchemyError
2
3def safe_query(query, engine):
4 """Execute query with error handling"""
5 try:
6 df = pd.read_sql(query, engine)
7 return df
8 except SQLAlchemyError as e:
9 print(f"Database error: {e}")
10 return None
11 except Exception as e:
12 print(f"Unexpected error: {e}")
13 return None

7.3 Query Logging

Python
1import logging
2
3# Enable SQLAlchemy logging
4logging.basicConfig()
5logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
6
7# Or create engine with echo
8engine = create_engine(connection_string, echo=True)

8. Thực hành

Integration Exercise

Exercise: Build Analysis Module

Python
1# Create a complete analysis module that:
2# 1. Connects to database (use SQLite for practice)
3# 2. Creates sample tables and data
4# 3. Implements analysis functions
5# 4. Generates visualizations
6
7# YOUR CODE HERE
💡 Xem đáp án
Python
1import pandas as pd
2import numpy as np
3from sqlalchemy import create_engine
4import matplotlib.pyplot as plt
5from datetime import datetime, timedelta
6
7class RetailAnalytics:
8 def __init__(self, db_path='retail.db'):
9 self.engine = create_engine(f'sqlite:///{db_path}')
10 self._setup_sample_data()
11
12 def _setup_sample_data(self):
13 """Create sample data"""
14 np.random.seed(42)
15 n = 1000
16
17 # Customers
18 customers = pd.DataFrame({
19 'customer_id': range(1, 101),
20 'name': [f'Customer_{i}' for i in range(1, 101)],
21 'region': np.random.choice(['North', 'South', 'East', 'West'], 100)
22 })
23
24 # Products
25 products = pd.DataFrame({
26 'product_id': range(1, 21),
27 'name': [f'Product_{i}' for i in range(1, 21)],
28 'category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Sports'], 20),
29 'price': np.random.uniform(10, 500, 20).round(2)
30 })
31
32 # Orders
33 orders = pd.DataFrame({
34 'order_id': range(1, n+1),
35 'customer_id': np.random.randint(1, 101, n),
36 'product_id': np.random.randint(1, 21, n),
37 'quantity': np.random.randint(1, 10, n),
38 'order_date': pd.date_range('2024-01-01', periods=n, freq='8H')
39 })
40
41 # Write to database
42 customers.to_sql('customers', self.engine, if_exists='replace', index=False)
43 products.to_sql('products', self.engine, if_exists='replace', index=False)
44 orders.to_sql('orders', self.engine, if_exists='replace', index=False)
45
46 print("Sample data created!")
47
48 def top_customers(self, n=10):
49 """Get top N customers by spending"""
50 query = f"""
51 SELECT
52 c.customer_id,
53 c.name,
54 c.region,
55 COUNT(o.order_id) as order_count,
56 SUM(o.quantity * p.price) as total_spent
57 FROM customers c
58 JOIN orders o ON c.customer_id = o.customer_id
59 JOIN products p ON o.product_id = p.product_id
60 GROUP BY c.customer_id, c.name, c.region
61 ORDER BY total_spent DESC
62 LIMIT {n}
63 """
64 return pd.read_sql(query, self.engine)
65
66 def monthly_sales(self):
67 """Get monthly sales summary"""
68 query = """
69 SELECT
70 strftime('%Y-%m', order_date) as month,
71 COUNT(order_id) as orders,
72 SUM(quantity) as units,
73 SUM(quantity * price) as revenue
74 FROM orders o
75 JOIN products p ON o.product_id = p.product_id
76 GROUP BY strftime('%Y-%m', order_date)
77 ORDER BY month
78 """
79 return pd.read_sql(query, self.engine)
80
81 def category_performance(self):
82 """Analyze category performance"""
83 query = """
84 SELECT
85 p.category,
86 COUNT(o.order_id) as orders,
87 SUM(o.quantity) as units_sold,
88 ROUND(SUM(o.quantity * p.price), 2) as revenue,
89 ROUND(AVG(o.quantity * p.price), 2) as avg_order_value
90 FROM orders o
91 JOIN products p ON o.product_id = p.product_id
92 GROUP BY p.category
93 ORDER BY revenue DESC
94 """
95 return pd.read_sql(query, self.engine)
96
97 def regional_analysis(self):
98 """Analyze sales by region"""
99 query = """
100 SELECT
101 c.region,
102 COUNT(DISTINCT c.customer_id) as customers,
103 COUNT(o.order_id) as orders,
104 ROUND(SUM(o.quantity * p.price), 2) as revenue
105 FROM customers c
106 JOIN orders o ON c.customer_id = o.customer_id
107 JOIN products p ON o.product_id = p.product_id
108 GROUP BY c.region
109 """
110 return pd.read_sql(query, self.engine)
111
112 def generate_dashboard(self):
113 """Generate visual dashboard"""
114 fig, axes = plt.subplots(2, 2, figsize=(14, 10))
115
116 # 1. Monthly trend
117 monthly = self.monthly_sales()
118 axes[0, 0].plot(monthly['month'], monthly['revenue'], marker='o')
119 axes[0, 0].set_title('Monthly Revenue')
120 axes[0, 0].tick_params(axis='x', rotation=45)
121 axes[0, 0].set_ylabel('Revenue ($)')
122
123 # 2. Category breakdown
124 categories = self.category_performance()
125 axes[0, 1].barh(categories['category'], categories['revenue'])
126 axes[0, 1].set_title('Revenue by Category')
127 axes[0, 1].set_xlabel('Revenue ($)')
128
129 # 3. Regional distribution
130 regional = self.regional_analysis()
131 axes[1, 0].pie(regional['revenue'], labels=regional['region'], autopct='%1.1f%%')
132 axes[1, 0].set_title('Revenue by Region')
133
134 # 4. Top customers
135 top = self.top_customers(10)
136 axes[1, 1].barh(top['name'], top['total_spent'])
137 axes[1, 1].set_title('Top 10 Customers')
138 axes[1, 1].set_xlabel('Total Spent ($)')
139
140 plt.tight_layout()
141 plt.savefig('retail_dashboard.png', dpi=150)
142 plt.show()
143
144 return fig
145
146
147# Run analysis
148analytics = RetailAnalytics()
149
150print("\n📊 Top Customers:")
151print(analytics.top_customers(5))
152
153print("\n📈 Monthly Sales:")
154print(analytics.monthly_sales())
155
156print("\n🏷️ Category Performance:")
157print(analytics.category_performance())
158
159print("\n🌍 Regional Analysis:")
160print(analytics.regional_analysis())
161
162# Generate dashboard
163analytics.generate_dashboard()

9. Tổng kết

TopicKey Points
ConnectionSQLAlchemy engine, environment variables
Readingpd.read_sql(), chunked reading, parameters
Writingto_sql(), bulk insert, data types
WorkflowETL pipelines, analysis classes
Best PracticesConnection pooling, error handling, logging

Bài tiếp theo: Time Series Analysis