Feature Store & Model Monitoring
Production ML = Training + Serving + Monitoring. Model degrade over time. Feature Store giải quyết training-serving skew. Monitoring detect khi model "hỏng".
🎯 Mục tiêu
- Feature Engineering best practices
- Feature Store concept (Feast)
- Model monitoring: data drift & concept drift
- Alerting và auto-retraining
1. Feature Engineering at Scale
1.1 Common Feature Types
| Category | Examples | Techniques |
|---|---|---|
| Numeric | age, income, price | Binning, log transform, standardize |
| Categorical | city, product_type | One-hot, target encoding, frequency |
| Temporal | signup_date | Day of week, month, recency, seasonality |
| Text | review, comment | TF-IDF, word count, sentiment |
| Aggregation | total_purchases | Rolling mean, count, sum, ratio |
| Interaction | price * quantity | Cross features, polynomial |
1.2 Feature Engineering Pipeline
Python
1import pandas as pd2import numpy as np3from sklearn.base import BaseEstimator, TransformerMixin45class FeatureEngineer(BaseEstimator, TransformerMixin):6 """Production-ready feature engineering."""7 8 def fit(self, X, y=None):9 # Learn statistics from training data10 self.numeric_medians_ = X.select_dtypes('number').median()11 self.category_modes_ = X.select_dtypes('object').mode().iloc[0]12 return self13 14 def transform(self, X):15 df = X.copy()16 17 # 1. Handle missing values18 for col in df.select_dtypes('number').columns:19 df[col].fillna(self.numeric_medians_[col], inplace=True)20 for col in df.select_dtypes('object').columns:21 df[col].fillna(self.category_modes_[col], inplace=True)22 23 # 2. Temporal features24 if 'signup_date' in df.columns:25 df['signup_date'] = pd.to_datetime(df['signup_date'])26 df['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days27 df['signup_month'] = df['signup_date'].dt.month28 df['is_weekend_signup'] = df['signup_date'].dt.dayofweek >= 529 df.drop('signup_date', axis=1, inplace=True)30 31 # 3. Aggregation features32 if 'customer_id' in df.columns:33 agg = df.groupby('customer_id').agg({34 'amount': ['mean', 'sum', 'count', 'std'],35 'category': 'nunique'36 })37 agg.columns = ['avg_amount', 'total_amount', 'n_transactions', 38 'amount_std', 'n_categories']39 df = df.merge(agg, on='customer_id', how='left')40 41 # 4. Interaction features42 if 'price' in df.columns and 'quantity' in df.columns:43 df['total_value'] = df['price'] * df['quantity']44 df['log_price'] = np.log1p(df['price'])45 46 return df1.3 Training-Serving Skew Problem
Ví dụ
1Training Pipeline: Serving Pipeline:2┌────────────┐ ┌────────────┐3│ Historical │ │ Real-time │4│ Data │ │ Request │5└─────┬──────┘ └─────┬──────┘6 ↓ ↓7┌────────────┐ ┌────────────┐8│ Feature │ ← DIFFERENT → │ Feature │9│ Engineering│ CODE/LOGIC │ Engineering│10└─────┬──────┘ └─────┬──────┘11 ↓ ↓12 Training Prediction13 14Problem: Feature code duplicated, may diverge15Solution: Feature Store — single source of truth2. Feature Store with Feast
2.1 Concept
Ví dụ
1Feature Store = Centralized feature management2 3 ┌─────────────────┐4 Data Sources ──→│ Feature Store │──→ Training5 (DB, logs, │ │──→ Serving6 streams) │ • Feature Defs │7 │ • Versioning │8 │ • Offline Store │ (historical)9 │ • Online Store │ (real-time)10 └─────────────────┘2.2 Feast Setup
Python
1# pip install feast2# feast init feature_repo3# cd feature_repo45# feature_repo/feature_definitions.py6from feast import Entity, Feature, FeatureView, FileSource, ValueType7from datetime import timedelta89# Data source10customer_source = FileSource(11 path="data/customer_features.parquet",12 timestamp_field="event_timestamp"13)1415# Entity16customer = Entity(17 name="customer_id",18 value_type=ValueType.INT64,19 description="Customer identifier"20)2122# Feature View23customer_features = FeatureView(24 name="customer_features",25 entities=[customer],26 ttl=timedelta(days=1),27 schema=[28 Feature(name="total_purchases", dtype=ValueType.FLOAT),29 Feature(name="avg_order_value", dtype=ValueType.FLOAT),30 Feature(name="days_since_last_order", dtype=ValueType.INT64),31 Feature(name="favorite_category", dtype=ValueType.STRING),32 Feature(name="lifetime_value", dtype=ValueType.FLOAT),33 ],34 source=customer_source35)2.3 Feast Usage
Python
1from feast import FeatureStore23store = FeatureStore(repo_path="feature_repo/")45# TRAINING: Get historical features (point-in-time correct)6training_df = store.get_historical_features(7 entity_df=pd.DataFrame({8 "customer_id": [101, 102, 103],9 "event_timestamp": pd.to_datetime(["2025-01-01", "2025-01-01", "2025-01-01"])10 }),11 features=[12 "customer_features:total_purchases",13 "customer_features:avg_order_value",14 "customer_features:days_since_last_order",15 "customer_features:lifetime_value"16 ]17).to_df()1819# SERVING: Get online features (latest values)20online_features = store.get_online_features(21 features=[22 "customer_features:total_purchases",23 "customer_features:avg_order_value",24 "customer_features:lifetime_value"25 ],26 entity_rows=[{"customer_id": 101}]27).to_dict()2829print(online_features)30# {"customer_id": [101], "total_purchases": [47.0], ...}3. Model Monitoring
3.1 Types of Drift
Ví dụ
1┌──────────────────────────────────────────────┐2│ Model Degradation │3├─────────────────┬────────────────────────────┤4│ Data Drift │ Concept Drift │5│ (Feature Drift) │ (Target Drift) │6├─────────────────┼────────────────────────────┤7│ Input data │ Relationship between │8│ distribution │ features and target │9│ changes │ changes │10│ │ │11│ Ex: Average age │ Ex: Customer behavior │12│ in dataset │ changes after COVID │13│ shifts │ (same features, different │14│ │ meaning) │15├─────────────────┼────────────────────────────┤16│ Detection: │ Detection: │17│ Statistical │ Monitor prediction │18│ tests on inputs │ accuracy over time │19└─────────────────┴────────────────────────────┘3.2 Data Drift Detection with Evidently
Python
1# pip install evidently2from evidently.report import Report3from evidently.metric_preset import DataDriftPreset, TargetDriftPreset45# Reference data (training period)6reference_data = train_df78# Current data (production)9current_data = production_df1011# Data Drift Report12drift_report = Report(metrics=[13 DataDriftPreset()14])15drift_report.run(16 reference_data=reference_data,17 current_data=current_data18)1920# View results21drift_report.save_html("drift_report.html")2223# Programmatic access24results = drift_report.as_dict()25dataset_drift = results['metrics'][0]['result']['dataset_drift']26n_drifted = results['metrics'][0]['result']['number_of_drifted_columns']27print(f"Dataset drift detected: {dataset_drift}")28print(f"Drifted columns: {n_drifted}")3.3 Performance Monitoring
Python
1from evidently.report import Report2from evidently.metric_preset import ClassificationPreset34# Monitor classification performance over time5perf_report = Report(metrics=[6 ClassificationPreset()7])8perf_report.run(9 reference_data=reference_data,10 current_data=current_data11)12perf_report.save_html("performance_report.html")3.4 Custom Monitoring Dashboard
Python
1import pandas as pd2from datetime import datetime, timedelta34class ModelMonitor:5 def __init__(self, model_name, alert_threshold=0.1):6 self.model_name = model_name7 self.alert_threshold = alert_threshold8 self.metrics_log = []9 10 def log_prediction(self, features, prediction, actual=None):11 """Log each prediction for monitoring."""12 self.metrics_log.append({13 'timestamp': datetime.now(),14 'features': features,15 'prediction': prediction,16 'actual': actual17 })18 19 def check_data_drift(self, reference_stats, current_data):20 """Simple drift detection using PSI."""21 alerts = []22 for feature in reference_stats:23 ref_mean = reference_stats[feature]['mean']24 ref_std = reference_stats[feature]['std']25 curr_mean = current_data[feature].mean()26 27 # Z-score of mean shift28 z_score = abs(curr_mean - ref_mean) / (ref_std + 1e-8)29 30 if z_score > 3:31 alerts.append({32 'feature': feature,33 'z_score': z_score,34 'ref_mean': ref_mean,35 'curr_mean': curr_mean,36 'severity': 'HIGH'37 })38 elif z_score > 2:39 alerts.append({40 'feature': feature,41 'z_score': z_score,42 'severity': 'MEDIUM'43 })44 45 return alerts46 47 def check_prediction_drift(self, window_days=7):48 """Check if prediction distribution has shifted."""49 recent = [m for m in self.metrics_log 50 if m['timestamp'] > datetime.now() - timedelta(days=window_days)]51 52 if not recent:53 return None54 55 recent_preds = [m['prediction'] for m in recent]56 positive_rate = sum(1 for p in recent_preds if p == 1) / len(recent_preds)57 58 return {59 'prediction_count': len(recent_preds),60 'positive_rate': positive_rate,61 'alert': positive_rate > 0.5 # Too many positive predictions62 }63 64 def generate_report(self):65 """Weekly monitoring report."""66 if not self.metrics_log:67 return "No predictions logged yet."68 69 df = pd.DataFrame(self.metrics_log)70 71 report = {72 'model': self.model_name,73 'period': f"{df['timestamp'].min()} to {df['timestamp'].max()}",74 'total_predictions': len(df),75 'prediction_distribution': df['prediction'].value_counts().to_dict()76 }77 78 # If actuals available, compute accuracy79 actuals = df.dropna(subset=['actual'])80 if len(actuals) > 0:81 correct = (actuals['prediction'] == actuals['actual']).mean()82 report['accuracy'] = round(correct, 4)83 84 return report4. Alerting & Auto-Retraining
4.1 Alert System
Python
1import smtplib2from email.mime.text import MIMEText34def send_alert(subject, message, recipients):5 """Send alert email when drift detected."""6 msg = MIMEText(message)7 msg['Subject'] = f"[ML Alert] {subject}"8 msg['From'] = "ml-monitor@company.com"9 msg['To'] = ", ".join(recipients)10 11 # Or use Slack webhook12 import requests13 slack_webhook = "https://hooks.slack.com/services/xxx"14 requests.post(slack_webhook, json={15 "text": f":warning: *ML Alert: {subject}*\n{message}"16 })1718# Usage in monitoring pipeline19alerts = monitor.check_data_drift(reference_stats, current_data)20if any(a['severity'] == 'HIGH' for a in alerts):21 drifted_features = [a['feature'] for a in alerts if a['severity'] == 'HIGH']22 send_alert(23 subject="Data Drift Detected",24 message=f"High drift in features: {drifted_features}",25 recipients=["ml-team@company.com"]26 )4.2 Auto-Retraining Trigger
Python
1# Scheduled check (run daily via cron/Airflow)2def daily_monitoring_check():3 """Check model health and trigger retrain if needed."""4 5 # 1. Check data drift6 drift_alerts = monitor.check_data_drift(ref_stats, today_data)7 high_drift = [a for a in drift_alerts if a['severity'] == 'HIGH']8 9 # 2. Check performance (if labels available)10 report = monitor.generate_report()11 accuracy_drop = report.get('accuracy', 1.0) < 0.8512 13 # 3. Decision14 if len(high_drift) >= 3 or accuracy_drop:15 print("Triggering model retrain...")16 trigger_training_pipeline() # Call Prefect/Airflow17 send_alert("Auto-Retrain Triggered", 18 f"Reasons: drift={len(high_drift)} features, accuracy_drop={accuracy_drop}",19 ["ml-team@company.com"])20 else:21 print("Model healthy. No action needed.")5. Monitoring Checklist
| What to Monitor | How | Frequency |
|---|---|---|
| Input data drift | PSI, KS test, Evidently | Daily |
| Prediction distribution | Mean, std of predictions | Daily |
| Model accuracy | Compare with ground truth | When labels available |
| Latency | API response time | Real-time |
| Throughput | Requests per second | Real-time |
| Error rate | 5xx errors, timeouts | Real-time |
| Feature freshness | How old are feature values | Daily |
| Model staleness | Days since last retrain | Weekly |
📝 Quiz
-
Training-serving skew là gì?
- Model lỗi khi training
- Feature engineering khác nhau giữa training và serving, dẫn đến kết quả sai
- Training data quá lớn
- Serving chậm hơn training
-
Feature Store giải quyết vấn đề gì chính?
- Single source of truth cho features, tránh training-serving skew
- Lưu trữ data rẻ hơn
- Tăng accuracy model
- Thay thế database
-
Concept drift khác data drift ở?
- Không khác nhau
- Concept drift nhanh hơn
- Data drift = input distribution thay đổi, Concept drift = relationship input-output thay đổi
- Data drift nghiêm trọng hơn
🎯 Key Takeaways
- Feature Store — Tránh training-serving skew, centralize features
- Data Drift — Input distribution thay đổi → model degrade
- Concept Drift — Real-world relationship thay đổi
- Evidently — Open-source monitoring tool
- Auto-retrain — Trigger retraining khi drift detected
🚀 Bài tiếp theo
Capstone Project — Xây dựng E-commerce Recommendation System end-to-end!
