SQL + Python Integration
1. Introduction
Sức mạnh kết hợp SQL + Python
SQL excel ở querying và aggregating data. Python excel ở analysis, visualization, và ML. Kết hợp cả hai cho workflow mạnh mẽ nhất.
1.1 Workflow Overview
Text
1┌─────────────────────────────────────────────────────────┐2│ SQL + Python Integration │3├─────────────────────────────────────────────────────────┤4│ │5│ ┌──────────────┐ ┌──────────────┐ │6│ │ Database │───▶│ SQLAlchemy │ │7│ │ PostgreSQL │ │ /psycopg2 │ │8│ │ MySQL │ └──────┬───────┘ │9│ │ SQLite │ │ │10│ └──────────────┘ v │11│ ┌──────────────┐ │12│ │ Pandas │ │13│ │ DataFrame │ │14│ └──────┬───────┘ │15│ │ │16│ ┌───────────────────┼───────────────────┐ │17│ v v v │18│ ┌──────────────┐ ┌──────────────┐ ┌──────────┐ │19│ │ Analysis │ │Visualization │ │ ML │ │20│ │ Pandas │ │ Matplotlib │ │ Sklearn │ │21│ └──────────────┘ └──────────────┘ └──────────┘ │22│ │23└─────────────────────────────────────────────────────────┘2. Connection Methods
2.1 SQLAlchemy (Recommended)
Python
1from sqlalchemy import create_engine2import pandas as pd34# PostgreSQL5engine = create_engine('postgresql://user:password@host:5432/database')67# MySQL8engine = create_engine('mysql+pymysql://user:password@host:3306/database')910# SQLite11engine = create_engine('sqlite:///my_database.db')1213# SQL Server14engine = create_engine('mssql+pyodbc://user:password@host/database?driver=ODBC+Driver+17+for+SQL+Server')1516# Connection string with options17engine = create_engine(18 'postgresql://user:password@host:5432/database',19 echo=False, # Don't log queries20 pool_size=5, # Connection pool size21 pool_recycle=3600, # Recycle connections after 1 hour22 connect_args={'connect_timeout': 10}23)2.2 Using Environment Variables
Python
1import os2from sqlalchemy import create_engine3from dotenv import load_dotenv45# Load .env file6load_dotenv()78# Get credentials from environment9DB_USER = os.getenv('DB_USER')10DB_PASS = os.getenv('DB_PASS')11DB_HOST = os.getenv('DB_HOST')12DB_NAME = os.getenv('DB_NAME')1314# Create connection string15connection_string = f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:5432/{DB_NAME}'16engine = create_engine(connection_string)1718# .env file example:19# DB_USER=myuser20# DB_PASS=mypassword21# DB_HOST=localhost22# DB_NAME=mydatabase2.3 Context Manager Pattern
Python
1from sqlalchemy import create_engine2from contextlib import contextmanager34@contextmanager5def get_db_connection():6 """Context manager for database connections"""7 engine = create_engine('postgresql://user:pass@host/db')8 connection = engine.connect()9 try:10 yield connection11 finally:12 connection.close()1314# Usage15with get_db_connection() as conn:16 result = conn.execute("SELECT * FROM orders LIMIT 10")17 for row in result:18 print(row)3. Reading Data with Pandas
3.1 pd.read_sql()
Python
1import pandas as pd2from sqlalchemy import create_engine34engine = create_engine('postgresql://user:pass@host/database')56# Basic query7df = pd.read_sql("SELECT * FROM orders", engine)89# With parameters (safe from SQL injection)10customer_id = 10011df = pd.read_sql(12 "SELECT * FROM orders WHERE customer_id = %(cid)s",13 engine,14 params={'cid': customer_id}15)1617# Read entire table18df = pd.read_sql_table('orders', engine)1920# Complex query21query = """22 SELECT 23 o.order_id,24 o.order_date,25 o.amount,26 c.name as customer_name27 FROM orders o28 JOIN customers c ON o.customer_id = c.id29 WHERE o.order_date >= '2024-01-01'30 ORDER BY o.order_date DESC31"""32df = pd.read_sql(query, engine)3334print(f"Shape: {df.shape}")35print(df.head())3.2 Chunked Reading
Python
1# For large datasets, read in chunks2chunk_size = 1000003chunks = []45for chunk in pd.read_sql(6 "SELECT * FROM large_table",7 engine,8 chunksize=chunk_size9):10 # Process each chunk11 processed = chunk.query('amount > 0')12 chunks.append(processed)13 print(f"Processed chunk with {len(chunk)} rows")1415df = pd.concat(chunks, ignore_index=True)1617# Or process without storing all chunks18total_amount = 019for chunk in pd.read_sql("SELECT amount FROM orders", engine, chunksize=10000):20 total_amount += chunk['amount'].sum()21print(f"Total: {total_amount}")3.3 Parse Dates and Index
Python
1# Parse date columns2df = pd.read_sql(3 "SELECT * FROM orders",4 engine,5 parse_dates=['order_date', 'ship_date']6)78# Set index column9df = pd.read_sql(10 "SELECT * FROM orders",11 engine,12 index_col='order_id'13)1415# Multiple options16df = pd.read_sql(17 "SELECT * FROM orders WHERE order_date >= '2024-01-01'",18 engine,19 parse_dates={'order_date': '%Y-%m-%d'},20 index_col='order_id',21 coerce_float=True22)4. Writing Data to Database
4.1 pd.to_sql()
Python
1# Create sample DataFrame2df = pd.DataFrame({3 'name': ['Product A', 'Product B', 'Product C'],4 'price': [100.0, 200.0, 150.0],5 'category': ['Electronics', 'Clothing', 'Electronics']6})78# Write to database9df.to_sql(10 'products', # Table name11 engine, # Connection12 if_exists='replace', # 'fail', 'replace', 'append'13 index=False # Don't write index as column14)1516# Append new data17new_products = pd.DataFrame({18 'name': ['Product D'],19 'price': [300.0],20 'category': ['Home']21})22new_products.to_sql('products', engine, if_exists='append', index=False)4.2 Write with Data Types
Python
1from sqlalchemy import Integer, String, Float, DateTime23# Specify SQL data types4df.to_sql(5 'orders',6 engine,7 if_exists='replace',8 index=False,9 dtype={10 'order_id': Integer,11 'customer_name': String(100),12 'amount': Float,13 'order_date': DateTime14 }15)4.3 Efficient Bulk Insert
Python
1# For large DataFrames, use method='multi' or chunks2df.to_sql(3 'large_table',4 engine,5 if_exists='append',6 index=False,7 method='multi', # Insert multiple rows at once8 chunksize=1000 # Commit every 1000 rows9)1011# Using COPY for PostgreSQL (fastest)12from io import StringIO1314def psql_insert_copy(table, conn, keys, data_iter):15 """Use PostgreSQL COPY for fast bulk insert"""16 dbapi_conn = conn.connection17 with dbapi_conn.cursor() as cur:18 s_buf = StringIO()19 for row in data_iter:20 s_buf.write('\t'.join(map(str, row)) + '\n')21 s_buf.seek(0)22 cur.copy_from(s_buf, table.name, columns=keys, null='')2324df.to_sql('table', engine, method=psql_insert_copy, index=False)5. SQLAlchemy ORM Basics
5.1 Define Models
Python
1from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey2from sqlalchemy.ext.declarative import declarative_base3from sqlalchemy.orm import relationship45Base = declarative_base()67class Customer(Base):8 __tablename__ = 'customers'9 10 id = Column(Integer, primary_key=True)11 name = Column(String(100))12 email = Column(String(100), unique=True)13 14 orders = relationship('Order', back_populates='customer')1516class Order(Base):17 __tablename__ = 'orders'18 19 id = Column(Integer, primary_key=True)20 customer_id = Column(Integer, ForeignKey('customers.id'))21 amount = Column(Float)22 order_date = Column(DateTime)23 24 customer = relationship('Customer', back_populates='orders')2526# Create tables27Base.metadata.create_all(engine)5.2 Query with ORM
Python
1from sqlalchemy.orm import sessionmaker23Session = sessionmaker(bind=engine)4session = Session()56# Query all7customers = session.query(Customer).all()89# Filter10big_orders = session.query(Order).filter(Order.amount > 1000).all()1112# Join13result = session.query(Customer, Order)\14 .join(Order)\15 .filter(Order.amount > 100)\16 .all()1718# Convert to DataFrame19df = pd.read_sql(20 session.query(Customer).statement,21 engine22)2324session.close()6. Practical Workflows
6.1 ETL Pipeline
Python
1import pandas as pd2from sqlalchemy import create_engine3from datetime import datetime45class ETLPipeline:6 def __init__(self, source_conn, target_conn):7 self.source_engine = create_engine(source_conn)8 self.target_engine = create_engine(target_conn)9 10 def extract(self, query):11 """Extract data from source"""12 print(f"Extracting data...")13 df = pd.read_sql(query, self.source_engine)14 print(f"Extracted {len(df)} rows")15 return df16 17 def transform(self, df):18 """Transform data"""19 print("Transforming data...")20 21 # Clean22 df = df.dropna(subset=['amount'])23 24 # Add calculated columns25 df['year'] = df['order_date'].dt.year26 df['month'] = df['order_date'].dt.month27 df['amount_category'] = pd.cut(28 df['amount'],29 bins=[0, 100, 500, float('inf')],30 labels=['small', 'medium', 'large']31 )32 33 # Aggregate34 summary = df.groupby(['year', 'month']).agg({35 'amount': ['sum', 'mean', 'count']36 }).reset_index()37 summary.columns = ['year', 'month', 'total_amount', 'avg_amount', 'order_count']38 39 print(f"Transformed to {len(summary)} rows")40 return summary41 42 def load(self, df, table_name):43 """Load data to target"""44 print(f"Loading to {table_name}...")45 df['loaded_at'] = datetime.now()46 df.to_sql(47 table_name,48 self.target_engine,49 if_exists='replace',50 index=False51 )52 print(f"Loaded {len(df)} rows")53 54 def run(self, query, target_table):55 """Run complete ETL"""56 df = self.extract(query)57 transformed = self.transform(df)58 self.load(transformed, target_table)59 return transformed606162# Usage63pipeline = ETLPipeline(64 source_conn='postgresql://user:pass@source_host/db',65 target_conn='postgresql://user:pass@target_host/datawarehouse'66)6768result = pipeline.run(69 query="SELECT * FROM orders WHERE order_date >= '2024-01-01'",70 target_table='monthly_sales_summary'71)6.2 Analysis Class
Python
1class SalesAnalyzer:2 def __init__(self, connection_string):3 self.engine = create_engine(connection_string)4 5 def get_top_customers(self, n=10, start_date=None):6 """Get top N customers by revenue"""7 query = """8 SELECT 9 c.id as customer_id,10 c.name,11 COUNT(o.id) as order_count,12 SUM(o.amount) as total_spent,13 AVG(o.amount) as avg_order14 FROM customers c15 JOIN orders o ON c.id = o.customer_id16 WHERE 1=117 """18 if start_date:19 query += f" AND o.order_date >= '{start_date}'"20 21 query += f"""22 GROUP BY c.id, c.name23 ORDER BY total_spent DESC24 LIMIT {n}25 """26 return pd.read_sql(query, self.engine)27 28 def get_monthly_trends(self, year):29 """Get monthly sales trends"""30 query = f"""31 SELECT 32 EXTRACT(MONTH FROM order_date) as month,33 COUNT(*) as orders,34 SUM(amount) as revenue,35 AVG(amount) as avg_order36 FROM orders37 WHERE EXTRACT(YEAR FROM order_date) = {year}38 GROUP BY EXTRACT(MONTH FROM order_date)39 ORDER BY month40 """41 return pd.read_sql(query, self.engine)42 43 def get_product_performance(self, category=None):44 """Analyze product performance"""45 query = """46 SELECT 47 p.category,48 p.name,49 SUM(oi.quantity) as units_sold,50 SUM(oi.quantity * oi.unit_price) as revenue51 FROM order_items oi52 JOIN products p ON oi.product_id = p.id53 """54 if category:55 query += f" WHERE p.category = '{category}'"56 57 query += """58 GROUP BY p.category, p.name59 ORDER BY revenue DESC60 """61 return pd.read_sql(query, self.engine)626364# Usage65analyzer = SalesAnalyzer('postgresql://user:pass@host/db')6667top_customers = analyzer.get_top_customers(n=20, start_date='2024-01-01')68monthly = analyzer.get_monthly_trends(2024)69products = analyzer.get_product_performance(category='Electronics')6.3 Parameterized Reports
Python
1from jinja2 import Template23def generate_report_query(template, **params):4 """Generate SQL from template"""5 sql_template = Template(template)6 return sql_template.render(**params)78# Template9report_template = """10SELECT 11 {{ group_by_column }},12 COUNT(*) as count,13 SUM(amount) as total,14 AVG(amount) as average15FROM orders16WHERE order_date BETWEEN '{{ start_date }}' AND '{{ end_date }}'17{% if status %}18 AND status = '{{ status }}'19{% endif %}20GROUP BY {{ group_by_column }}21ORDER BY total DESC22{% if limit %}23LIMIT {{ limit }}24{% endif %}25"""2627# Generate query28query = generate_report_query(29 report_template,30 group_by_column='customer_id',31 start_date='2024-01-01',32 end_date='2024-12-31',33 status='completed',34 limit=10035)3637df = pd.read_sql(query, engine)7. Best Practices
7.1 Connection Management
Python
1# Use context managers2from sqlalchemy import create_engine3from sqlalchemy.orm import sessionmaker45engine = create_engine(connection_string, pool_pre_ping=True)6Session = sessionmaker(bind=engine)78# For queries9with engine.connect() as conn:10 result = conn.execute("SELECT * FROM table")11 12# For ORM operations13with Session() as session:14 session.query(Model).filter(...)7.2 Error Handling
Python
1from sqlalchemy.exc import SQLAlchemyError23def safe_query(query, engine):4 """Execute query with error handling"""5 try:6 df = pd.read_sql(query, engine)7 return df8 except SQLAlchemyError as e:9 print(f"Database error: {e}")10 return None11 except Exception as e:12 print(f"Unexpected error: {e}")13 return None7.3 Query Logging
Python
1import logging23# Enable SQLAlchemy logging4logging.basicConfig()5logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)67# Or create engine with echo8engine = create_engine(connection_string, echo=True)8. Thực hành
Integration Exercise
Exercise: Build Analysis Module
Python
1# Create a complete analysis module that:2# 1. Connects to database (use SQLite for practice)3# 2. Creates sample tables and data4# 3. Implements analysis functions5# 4. Generates visualizations67# YOUR CODE HERE💡 Xem đáp án
Python
1import pandas as pd2import numpy as np3from sqlalchemy import create_engine4import matplotlib.pyplot as plt5from datetime import datetime, timedelta67class RetailAnalytics:8 def __init__(self, db_path='retail.db'):9 self.engine = create_engine(f'sqlite:///{db_path}')10 self._setup_sample_data()11 12 def _setup_sample_data(self):13 """Create sample data"""14 np.random.seed(42)15 n = 100016 17 # Customers18 customers = pd.DataFrame({19 'customer_id': range(1, 101),20 'name': [f'Customer_{i}' for i in range(1, 101)],21 'region': np.random.choice(['North', 'South', 'East', 'West'], 100)22 })23 24 # Products 25 products = pd.DataFrame({26 'product_id': range(1, 21),27 'name': [f'Product_{i}' for i in range(1, 21)],28 'category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Sports'], 20),29 'price': np.random.uniform(10, 500, 20).round(2)30 })31 32 # Orders33 orders = pd.DataFrame({34 'order_id': range(1, n+1),35 'customer_id': np.random.randint(1, 101, n),36 'product_id': np.random.randint(1, 21, n),37 'quantity': np.random.randint(1, 10, n),38 'order_date': pd.date_range('2024-01-01', periods=n, freq='8H')39 })40 41 # Write to database42 customers.to_sql('customers', self.engine, if_exists='replace', index=False)43 products.to_sql('products', self.engine, if_exists='replace', index=False)44 orders.to_sql('orders', self.engine, if_exists='replace', index=False)45 46 print("Sample data created!")47 48 def top_customers(self, n=10):49 """Get top N customers by spending"""50 query = f"""51 SELECT 52 c.customer_id,53 c.name,54 c.region,55 COUNT(o.order_id) as order_count,56 SUM(o.quantity * p.price) as total_spent57 FROM customers c58 JOIN orders o ON c.customer_id = o.customer_id59 JOIN products p ON o.product_id = p.product_id60 GROUP BY c.customer_id, c.name, c.region61 ORDER BY total_spent DESC62 LIMIT {n}63 """64 return pd.read_sql(query, self.engine)65 66 def monthly_sales(self):67 """Get monthly sales summary"""68 query = """69 SELECT 70 strftime('%Y-%m', order_date) as month,71 COUNT(order_id) as orders,72 SUM(quantity) as units,73 SUM(quantity * price) as revenue74 FROM orders o75 JOIN products p ON o.product_id = p.product_id76 GROUP BY strftime('%Y-%m', order_date)77 ORDER BY month78 """79 return pd.read_sql(query, self.engine)80 81 def category_performance(self):82 """Analyze category performance"""83 query = """84 SELECT 85 p.category,86 COUNT(o.order_id) as orders,87 SUM(o.quantity) as units_sold,88 ROUND(SUM(o.quantity * p.price), 2) as revenue,89 ROUND(AVG(o.quantity * p.price), 2) as avg_order_value90 FROM orders o91 JOIN products p ON o.product_id = p.product_id92 GROUP BY p.category93 ORDER BY revenue DESC94 """95 return pd.read_sql(query, self.engine)96 97 def regional_analysis(self):98 """Analyze sales by region"""99 query = """100 SELECT 101 c.region,102 COUNT(DISTINCT c.customer_id) as customers,103 COUNT(o.order_id) as orders,104 ROUND(SUM(o.quantity * p.price), 2) as revenue105 FROM customers c106 JOIN orders o ON c.customer_id = o.customer_id107 JOIN products p ON o.product_id = p.product_id108 GROUP BY c.region109 """110 return pd.read_sql(query, self.engine)111 112 def generate_dashboard(self):113 """Generate visual dashboard"""114 fig, axes = plt.subplots(2, 2, figsize=(14, 10))115 116 # 1. Monthly trend117 monthly = self.monthly_sales()118 axes[0, 0].plot(monthly['month'], monthly['revenue'], marker='o')119 axes[0, 0].set_title('Monthly Revenue')120 axes[0, 0].tick_params(axis='x', rotation=45)121 axes[0, 0].set_ylabel('Revenue ($)')122 123 # 2. Category breakdown124 categories = self.category_performance()125 axes[0, 1].barh(categories['category'], categories['revenue'])126 axes[0, 1].set_title('Revenue by Category')127 axes[0, 1].set_xlabel('Revenue ($)')128 129 # 3. Regional distribution130 regional = self.regional_analysis()131 axes[1, 0].pie(regional['revenue'], labels=regional['region'], autopct='%1.1f%%')132 axes[1, 0].set_title('Revenue by Region')133 134 # 4. Top customers135 top = self.top_customers(10)136 axes[1, 1].barh(top['name'], top['total_spent'])137 axes[1, 1].set_title('Top 10 Customers')138 axes[1, 1].set_xlabel('Total Spent ($)')139 140 plt.tight_layout()141 plt.savefig('retail_dashboard.png', dpi=150)142 plt.show()143 144 return fig145146147# Run analysis148analytics = RetailAnalytics()149150print("\n📊 Top Customers:")151print(analytics.top_customers(5))152153print("\n📈 Monthly Sales:")154print(analytics.monthly_sales())155156print("\n🏷️ Category Performance:")157print(analytics.category_performance())158159print("\n🌍 Regional Analysis:")160print(analytics.regional_analysis())161162# Generate dashboard163analytics.generate_dashboard()9. Tổng kết
| Topic | Key Points |
|---|---|
| Connection | SQLAlchemy engine, environment variables |
| Reading | pd.read_sql(), chunked reading, parameters |
| Writing | to_sql(), bulk insert, data types |
| Workflow | ETL pipelines, analysis classes |
| Best Practices | Connection pooling, error handling, logging |
Bài tiếp theo: Time Series Analysis
