MinAI - Về trang chủ
Hướng dẫn
11/1340 phút
Đang tải...

Feature Store & Model Monitoring

Feature Engineering at Scale, Feature Store với Feast, và Model Monitoring

Feature Store & Model Monitoring

Production ML = Training + Serving + Monitoring. Model degrade over time. Feature Store giải quyết training-serving skew. Monitoring detect khi model "hỏng".

🎯 Mục tiêu

  • Feature Engineering best practices
  • Feature Store concept (Feast)
  • Model monitoring: data drift & concept drift
  • Alerting và auto-retraining

1. Feature Engineering at Scale

1.1 Common Feature Types

CategoryExamplesTechniques
Numericage, income, priceBinning, log transform, standardize
Categoricalcity, product_typeOne-hot, target encoding, frequency
Temporalsignup_dateDay of week, month, recency, seasonality
Textreview, commentTF-IDF, word count, sentiment
Aggregationtotal_purchasesRolling mean, count, sum, ratio
Interactionprice * quantityCross features, polynomial

1.2 Feature Engineering Pipeline

Python
1import pandas as pd
2import numpy as np
3from sklearn.base import BaseEstimator, TransformerMixin
4
5class FeatureEngineer(BaseEstimator, TransformerMixin):
6 """Production-ready feature engineering."""
7
8 def fit(self, X, y=None):
9 # Learn statistics from training data
10 self.numeric_medians_ = X.select_dtypes('number').median()
11 self.category_modes_ = X.select_dtypes('object').mode().iloc[0]
12 return self
13
14 def transform(self, X):
15 df = X.copy()
16
17 # 1. Handle missing values
18 for col in df.select_dtypes('number').columns:
19 df[col].fillna(self.numeric_medians_[col], inplace=True)
20 for col in df.select_dtypes('object').columns:
21 df[col].fillna(self.category_modes_[col], inplace=True)
22
23 # 2. Temporal features
24 if 'signup_date' in df.columns:
25 df['signup_date'] = pd.to_datetime(df['signup_date'])
26 df['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days
27 df['signup_month'] = df['signup_date'].dt.month
28 df['is_weekend_signup'] = df['signup_date'].dt.dayofweek >= 5
29 df.drop('signup_date', axis=1, inplace=True)
30
31 # 3. Aggregation features
32 if 'customer_id' in df.columns:
33 agg = df.groupby('customer_id').agg({
34 'amount': ['mean', 'sum', 'count', 'std'],
35 'category': 'nunique'
36 })
37 agg.columns = ['avg_amount', 'total_amount', 'n_transactions',
38 'amount_std', 'n_categories']
39 df = df.merge(agg, on='customer_id', how='left')
40
41 # 4. Interaction features
42 if 'price' in df.columns and 'quantity' in df.columns:
43 df['total_value'] = df['price'] * df['quantity']
44 df['log_price'] = np.log1p(df['price'])
45
46 return df

1.3 Training-Serving Skew Problem

Ví dụ
1Training Pipeline: Serving Pipeline:
2┌────────────┐ ┌────────────┐
3│ Historical │ │ Real-time │
4│ Data │ │ Request │
5└─────┬──────┘ └─────┬──────┘
6 ↓ ↓
7┌────────────┐ ┌────────────┐
8│ Feature │ ← DIFFERENT → │ Feature │
9│ Engineering│ CODE/LOGIC │ Engineering│
10└─────┬──────┘ └─────┬──────┘
11 ↓ ↓
12 Training Prediction
13
14Problem: Feature code duplicated, may diverge
15Solution: Feature Store — single source of truth

2. Feature Store with Feast

2.1 Concept

Ví dụ
1Feature Store = Centralized feature management
2
3 ┌─────────────────┐
4 Data Sources ──→│ Feature Store │──→ Training
5 (DB, logs, │ │──→ Serving
6 streams) │ • Feature Defs │
7 │ • Versioning │
8 │ • Offline Store │ (historical)
9 │ • Online Store │ (real-time)
10 └─────────────────┘

2.2 Feast Setup

Python
1# pip install feast
2# feast init feature_repo
3# cd feature_repo
4
5# feature_repo/feature_definitions.py
6from feast import Entity, Feature, FeatureView, FileSource, ValueType
7from datetime import timedelta
8
9# Data source
10customer_source = FileSource(
11 path="data/customer_features.parquet",
12 timestamp_field="event_timestamp"
13)
14
15# Entity
16customer = Entity(
17 name="customer_id",
18 value_type=ValueType.INT64,
19 description="Customer identifier"
20)
21
22# Feature View
23customer_features = FeatureView(
24 name="customer_features",
25 entities=[customer],
26 ttl=timedelta(days=1),
27 schema=[
28 Feature(name="total_purchases", dtype=ValueType.FLOAT),
29 Feature(name="avg_order_value", dtype=ValueType.FLOAT),
30 Feature(name="days_since_last_order", dtype=ValueType.INT64),
31 Feature(name="favorite_category", dtype=ValueType.STRING),
32 Feature(name="lifetime_value", dtype=ValueType.FLOAT),
33 ],
34 source=customer_source
35)

2.3 Feast Usage

Python
1from feast import FeatureStore
2
3store = FeatureStore(repo_path="feature_repo/")
4
5# TRAINING: Get historical features (point-in-time correct)
6training_df = store.get_historical_features(
7 entity_df=pd.DataFrame({
8 "customer_id": [101, 102, 103],
9 "event_timestamp": pd.to_datetime(["2025-01-01", "2025-01-01", "2025-01-01"])
10 }),
11 features=[
12 "customer_features:total_purchases",
13 "customer_features:avg_order_value",
14 "customer_features:days_since_last_order",
15 "customer_features:lifetime_value"
16 ]
17).to_df()
18
19# SERVING: Get online features (latest values)
20online_features = store.get_online_features(
21 features=[
22 "customer_features:total_purchases",
23 "customer_features:avg_order_value",
24 "customer_features:lifetime_value"
25 ],
26 entity_rows=[{"customer_id": 101}]
27).to_dict()
28
29print(online_features)
30# {"customer_id": [101], "total_purchases": [47.0], ...}

3. Model Monitoring

3.1 Types of Drift

Ví dụ
1┌──────────────────────────────────────────────┐
2│ Model Degradation │
3├─────────────────┬────────────────────────────┤
4│ Data Drift │ Concept Drift │
5│ (Feature Drift) │ (Target Drift) │
6├─────────────────┼────────────────────────────┤
7│ Input data │ Relationship between │
8│ distribution │ features and target │
9│ changes │ changes │
10│ │ │
11│ Ex: Average age │ Ex: Customer behavior │
12│ in dataset │ changes after COVID │
13│ shifts │ (same features, different │
14│ │ meaning) │
15├─────────────────┼────────────────────────────┤
16│ Detection: │ Detection: │
17│ Statistical │ Monitor prediction │
18│ tests on inputs │ accuracy over time │
19└─────────────────┴────────────────────────────┘

3.2 Data Drift Detection with Evidently

Python
1# pip install evidently
2from evidently.report import Report
3from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
4
5# Reference data (training period)
6reference_data = train_df
7
8# Current data (production)
9current_data = production_df
10
11# Data Drift Report
12drift_report = Report(metrics=[
13 DataDriftPreset()
14])
15drift_report.run(
16 reference_data=reference_data,
17 current_data=current_data
18)
19
20# View results
21drift_report.save_html("drift_report.html")
22
23# Programmatic access
24results = drift_report.as_dict()
25dataset_drift = results['metrics'][0]['result']['dataset_drift']
26n_drifted = results['metrics'][0]['result']['number_of_drifted_columns']
27print(f"Dataset drift detected: {dataset_drift}")
28print(f"Drifted columns: {n_drifted}")

3.3 Performance Monitoring

Python
1from evidently.report import Report
2from evidently.metric_preset import ClassificationPreset
3
4# Monitor classification performance over time
5perf_report = Report(metrics=[
6 ClassificationPreset()
7])
8perf_report.run(
9 reference_data=reference_data,
10 current_data=current_data
11)
12perf_report.save_html("performance_report.html")

3.4 Custom Monitoring Dashboard

Python
1import pandas as pd
2from datetime import datetime, timedelta
3
4class ModelMonitor:
5 def __init__(self, model_name, alert_threshold=0.1):
6 self.model_name = model_name
7 self.alert_threshold = alert_threshold
8 self.metrics_log = []
9
10 def log_prediction(self, features, prediction, actual=None):
11 """Log each prediction for monitoring."""
12 self.metrics_log.append({
13 'timestamp': datetime.now(),
14 'features': features,
15 'prediction': prediction,
16 'actual': actual
17 })
18
19 def check_data_drift(self, reference_stats, current_data):
20 """Simple drift detection using PSI."""
21 alerts = []
22 for feature in reference_stats:
23 ref_mean = reference_stats[feature]['mean']
24 ref_std = reference_stats[feature]['std']
25 curr_mean = current_data[feature].mean()
26
27 # Z-score of mean shift
28 z_score = abs(curr_mean - ref_mean) / (ref_std + 1e-8)
29
30 if z_score > 3:
31 alerts.append({
32 'feature': feature,
33 'z_score': z_score,
34 'ref_mean': ref_mean,
35 'curr_mean': curr_mean,
36 'severity': 'HIGH'
37 })
38 elif z_score > 2:
39 alerts.append({
40 'feature': feature,
41 'z_score': z_score,
42 'severity': 'MEDIUM'
43 })
44
45 return alerts
46
47 def check_prediction_drift(self, window_days=7):
48 """Check if prediction distribution has shifted."""
49 recent = [m for m in self.metrics_log
50 if m['timestamp'] > datetime.now() - timedelta(days=window_days)]
51
52 if not recent:
53 return None
54
55 recent_preds = [m['prediction'] for m in recent]
56 positive_rate = sum(1 for p in recent_preds if p == 1) / len(recent_preds)
57
58 return {
59 'prediction_count': len(recent_preds),
60 'positive_rate': positive_rate,
61 'alert': positive_rate > 0.5 # Too many positive predictions
62 }
63
64 def generate_report(self):
65 """Weekly monitoring report."""
66 if not self.metrics_log:
67 return "No predictions logged yet."
68
69 df = pd.DataFrame(self.metrics_log)
70
71 report = {
72 'model': self.model_name,
73 'period': f"{df['timestamp'].min()} to {df['timestamp'].max()}",
74 'total_predictions': len(df),
75 'prediction_distribution': df['prediction'].value_counts().to_dict()
76 }
77
78 # If actuals available, compute accuracy
79 actuals = df.dropna(subset=['actual'])
80 if len(actuals) > 0:
81 correct = (actuals['prediction'] == actuals['actual']).mean()
82 report['accuracy'] = round(correct, 4)
83
84 return report

4. Alerting & Auto-Retraining

4.1 Alert System

Python
1import smtplib
2from email.mime.text import MIMEText
3
4def send_alert(subject, message, recipients):
5 """Send alert email when drift detected."""
6 msg = MIMEText(message)
7 msg['Subject'] = f"[ML Alert] {subject}"
8 msg['From'] = "ml-monitor@company.com"
9 msg['To'] = ", ".join(recipients)
10
11 # Or use Slack webhook
12 import requests
13 slack_webhook = "https://hooks.slack.com/services/xxx"
14 requests.post(slack_webhook, json={
15 "text": f":warning: *ML Alert: {subject}*\n{message}"
16 })
17
18# Usage in monitoring pipeline
19alerts = monitor.check_data_drift(reference_stats, current_data)
20if any(a['severity'] == 'HIGH' for a in alerts):
21 drifted_features = [a['feature'] for a in alerts if a['severity'] == 'HIGH']
22 send_alert(
23 subject="Data Drift Detected",
24 message=f"High drift in features: {drifted_features}",
25 recipients=["ml-team@company.com"]
26 )

4.2 Auto-Retraining Trigger

Python
1# Scheduled check (run daily via cron/Airflow)
2def daily_monitoring_check():
3 """Check model health and trigger retrain if needed."""
4
5 # 1. Check data drift
6 drift_alerts = monitor.check_data_drift(ref_stats, today_data)
7 high_drift = [a for a in drift_alerts if a['severity'] == 'HIGH']
8
9 # 2. Check performance (if labels available)
10 report = monitor.generate_report()
11 accuracy_drop = report.get('accuracy', 1.0) < 0.85
12
13 # 3. Decision
14 if len(high_drift) >= 3 or accuracy_drop:
15 print("Triggering model retrain...")
16 trigger_training_pipeline() # Call Prefect/Airflow
17 send_alert("Auto-Retrain Triggered",
18 f"Reasons: drift={len(high_drift)} features, accuracy_drop={accuracy_drop}",
19 ["ml-team@company.com"])
20 else:
21 print("Model healthy. No action needed.")

5. Monitoring Checklist

What to MonitorHowFrequency
Input data driftPSI, KS test, EvidentlyDaily
Prediction distributionMean, std of predictionsDaily
Model accuracyCompare with ground truthWhen labels available
LatencyAPI response timeReal-time
ThroughputRequests per secondReal-time
Error rate5xx errors, timeoutsReal-time
Feature freshnessHow old are feature valuesDaily
Model stalenessDays since last retrainWeekly

📝 Quiz

  1. Training-serving skew là gì?

    • Model lỗi khi training
    • Feature engineering khác nhau giữa training và serving, dẫn đến kết quả sai
    • Training data quá lớn
    • Serving chậm hơn training
  2. Feature Store giải quyết vấn đề gì chính?

    • Single source of truth cho features, tránh training-serving skew
    • Lưu trữ data rẻ hơn
    • Tăng accuracy model
    • Thay thế database
  3. Concept drift khác data drift ở?

    • Không khác nhau
    • Concept drift nhanh hơn
    • Data drift = input distribution thay đổi, Concept drift = relationship input-output thay đổi
    • Data drift nghiêm trọng hơn

🎯 Key Takeaways

  1. Feature Store — Tránh training-serving skew, centralize features
  2. Data Drift — Input distribution thay đổi → model degrade
  3. Concept Drift — Real-world relationship thay đổi
  4. Evidently — Open-source monitoring tool
  5. Auto-retrain — Trigger retraining khi drift detected

🚀 Bài tiếp theo

Capstone Project — Xây dựng E-commerce Recommendation System end-to-end!