MinAI - Về trang chủ
Hướng dẫn
9/1340 phút
Đang tải...

MLOps Fundamentals

ML Lifecycle, Experiment Tracking với MLflow, CI/CD cho ML, và ML Pipeline Design

MLOps Fundamentals

MLOps = Machine Learning + DevOps. Bridging the gap giữa model training và production deployment. 87% ML models never make it to production (Gartner) — MLOps solves this.

🎯 Mục tiêu

  • Hiểu ML lifecycle và MLOps maturity levels
  • Experiment tracking với MLflow
  • ML Pipeline design
  • CI/CD cho ML projects

1. ML Lifecycle

1.1 The Full Picture

ML Lifecycle

📊Data Pipeline
🏋️Model Training
🚀Deploy / Serving
📈Monitor / Retrain

Stages:

  1. Data Collection & Validation
  2. Feature Engineering & Store
  3. Model Training & Evaluation
  4. Model Validation & Testing
  5. Deployment & Serving
  6. Monitoring & Retraining

1.2 MLOps Maturity Levels

LevelNameDescription
0ManualJupyter notebooks, manual deploy
1ML PipelineAutomated training pipeline
2CI/CD PipelineAuto-train + auto-deploy + monitoring
3Full AutomationAuto-retrain on data drift, A/B testing

Most teams are Level 0-1. Goal: reach Level 2.


2. Experiment Tracking with MLflow

2.1 Why Track Experiments?

Ví dụ
1Without tracking:
2- "Which hyperparameters gave best result?"
3- "What data version was used?"
4- "Can we reproduce last month's model?"
5- Notebooks scattered everywhere
6
7With MLflow:
8- Every run logged with params, metrics, artifacts
9- Compare runs side-by-side
10- One-click model deployment
11- Full reproducibility

2.2 Setup

Python
1# pip install mlflow
2import mlflow
3import mlflow.sklearn
4from sklearn.ensemble import RandomForestClassifier
5from sklearn.metrics import accuracy_score, f1_score
6from sklearn.model_selection import train_test_split
7
8# Start tracking server (run in terminal)
9# mlflow ui --port 5000

2.3 Basic Experiment Tracking

Python
1# Set experiment
2mlflow.set_experiment("Customer Churn Prediction")
3
4# Prepare data
5X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
6
7# Log experiment
8with mlflow.start_run(run_name="rf_baseline"):
9 # Log parameters
10 params = {
11 "n_estimators": 100,
12 "max_depth": 10,
13 "min_samples_split": 5,
14 "random_state": 42
15 }
16 mlflow.log_params(params)
17
18 # Train model
19 model = RandomForestClassifier(**params)
20 model.fit(X_train, y_train)
21
22 # Evaluate
23 y_pred = model.predict(X_test)
24 accuracy = accuracy_score(y_test, y_pred)
25 f1 = f1_score(y_test, y_pred, average='weighted')
26
27 # Log metrics
28 mlflow.log_metrics({
29 "accuracy": accuracy,
30 "f1_score": f1,
31 "train_size": len(X_train),
32 "test_size": len(X_test)
33 })
34
35 # Log model
36 mlflow.sklearn.log_model(model, "model")
37
38 # Log artifacts (plots, data info)
39 mlflow.log_artifact("confusion_matrix.png")
40
41 print(f"Run ID: {mlflow.active_run().info.run_id}")
42 print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")

2.4 Compare Experiments

Python
1# Programmatic comparison
2import mlflow
3
4experiment = mlflow.get_experiment_by_name("Customer Churn Prediction")
5runs = mlflow.search_runs(
6 experiment_ids=[experiment.experiment_id],
7 order_by=["metrics.f1_score DESC"]
8)
9
10# Top 5 runs
11print(runs[['run_id', 'params.n_estimators', 'params.max_depth',
12 'metrics.accuracy', 'metrics.f1_score']].head())

2.5 Model Registry

Python
1# Register best model
2best_run_id = runs.iloc[0]['run_id']
3model_uri = f"runs:/{best_run_id}/model"
4
5mlflow.register_model(model_uri, "churn_prediction_model")
6
7# Transition to production
8from mlflow.tracking import MlflowClient
9client = MlflowClient()
10
11client.transition_model_version_stage(
12 name="churn_prediction_model",
13 version=1,
14 stage="Production"
15)
16
17# Load production model
18model = mlflow.pyfunc.load_model(
19 "models:/churn_prediction_model/Production"
20)
21predictions = model.predict(new_data)

3. ML Pipeline Design

3.1 Simple Pipeline with sklearn

Python
1from sklearn.pipeline import Pipeline
2from sklearn.preprocessing import StandardScaler
3from sklearn.impute import SimpleImputer
4from sklearn.compose import ColumnTransformer
5from sklearn.ensemble import GradientBoostingClassifier
6
7# Define preprocessing
8numeric_transformer = Pipeline([
9 ('imputer', SimpleImputer(strategy='median')),
10 ('scaler', StandardScaler())
11])
12
13categorical_transformer = Pipeline([
14 ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
15 ('encoder', OneHotEncoder(handle_unknown='ignore'))
16])
17
18preprocessor = ColumnTransformer([
19 ('num', numeric_transformer, numeric_features),
20 ('cat', categorical_transformer, categorical_features)
21])
22
23# Full pipeline
24pipeline = Pipeline([
25 ('preprocessor', preprocessor),
26 ('classifier', GradientBoostingClassifier(n_estimators=100))
27])
28
29# Train & save
30pipeline.fit(X_train, y_train)
31mlflow.sklearn.log_model(pipeline, "pipeline")

3.2 Advanced Pipeline with Prefect

Python
1# pip install prefect
2from prefect import flow, task
3import pandas as pd
4
5@task(name="load_data")
6def load_data(path: str) -> pd.DataFrame:
7 df = pd.read_csv(path)
8 print(f"Loaded {len(df)} rows")
9 return df
10
11@task(name="validate_data")
12def validate_data(df: pd.DataFrame) -> pd.DataFrame:
13 assert len(df) > 0, "Empty dataset!"
14 assert df.isnull().mean().max() < 0.5, "Too many nulls!"
15
16 # Check for data drift
17 stats = df.describe()
18 print(f"Data validation passed. Shape: {df.shape}")
19 return df
20
21@task(name="feature_engineering")
22def feature_engineering(df: pd.DataFrame) -> tuple:
23 # Feature creation
24 df['total_spend'] = df['quantity'] * df['price']
25 df['days_since_last'] = (pd.Timestamp.now() - pd.to_datetime(df['last_purchase'])).dt.days
26
27 X = df.drop('churn', axis=1)
28 y = df['churn']
29 return X, y
30
31@task(name="train_model")
32def train_model(X, y):
33 with mlflow.start_run():
34 model = GradientBoostingClassifier(n_estimators=200)
35 model.fit(X, y)
36 mlflow.sklearn.log_model(model, "model")
37 return model
38
39@flow(name="ml_training_pipeline")
40def training_pipeline(data_path: str):
41 df = load_data(data_path)
42 df = validate_data(df)
43 X, y = feature_engineering(df)
44 model = train_model(X, y)
45 return model
46
47# Run pipeline
48training_pipeline("data/customers.csv")

4. CI/CD for ML

4.1 ML-specific CI/CD

ML CI/CD Pipeline

📊Data Change
💻Code Change
Validate Data
🧪Unit Tests
🏋️Train Model
📈Evaluate
🎯Passes Threshold?
🚀Register Model → Deploy
🚫Alert Team, Block Deploy

4.2 GitHub Actions for ML

yaml
1# .github/workflows/ml-pipeline.yml
2name: ML Training Pipeline
3
4on:
5 push:
6 paths:
7 - 'src/models/**'
8 - 'src/features/**'
9 - 'data/processed/**'
10 schedule:
11 - cron: '0 6 * * 1' # Weekly retraining
12
13jobs:
14 train:
15 runs-on: ubuntu-latest
16 steps:
17 - uses: actions/checkout@v4
18
19 - name: Setup Python
20 uses: actions/setup-python@v5
21 with:
22 python-version: '3.11'
23
24 - name: Install dependencies
25 run: pip install -r requirements.txt
26
27 - name: Run data validation
28 run: python src/data/validate.py
29
30 - name: Train model
31 run: python src/models/train.py
32 env:
33 MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
34
35 - name: Evaluate model
36 run: python src/models/evaluate.py
37
38 - name: Check model quality
39 run: |
40 python -c "
41 import json
42 metrics = json.load(open('metrics.json'))
43 assert metrics['f1_score'] > 0.85, 'F1 below threshold!'
44 assert metrics['accuracy'] > 0.90, 'Accuracy below threshold!'
45 print('Model quality check passed!')
46 "
47
48 - name: Register model
49 if: success()
50 run: python src/models/register.py

4.3 Model Validation Checklist

Python
1def validate_model(model, X_test, y_test, thresholds):
2 """Production readiness check."""
3 checks = {}
4
5 # 1. Performance check
6 y_pred = model.predict(X_test)
7 acc = accuracy_score(y_test, y_pred)
8 checks['accuracy'] = acc >= thresholds['min_accuracy']
9
10 # 2. Inference speed
11 import time
12 start = time.time()
13 for _ in range(100):
14 model.predict(X_test[:1])
15 latency = (time.time() - start) / 100
16 checks['latency_ok'] = latency < thresholds['max_latency_ms'] / 1000
17
18 # 3. Model size
19 import joblib, os
20 joblib.dump(model, '/tmp/model.pkl')
21 size_mb = os.path.getsize('/tmp/model.pkl') / (1024 * 1024)
22 checks['size_ok'] = size_mb < thresholds['max_size_mb']
23
24 # 4. Fairness check (optional)
25 # Compare performance across demographic groups
26
27 all_passed = all(checks.values())
28 return all_passed, checks
29
30# Usage
31passed, checks = validate_model(model, X_test, y_test, {
32 'min_accuracy': 0.90,
33 'max_latency_ms': 50,
34 'max_size_mb': 500
35})

5. Tools Landscape

CategoryToolsUse Case
Experiment TrackingMLflow, W&B, NeptuneLog params, metrics, models
PipelinePrefect, Airflow, KubeflowOrchestrate ML workflows
Feature StoreFeast, TectonStore & serve features
Model ServingBentoML, Seldon, TorchServeDeploy models as APIs
MonitoringEvidently, NannyMLDetect drift, track performance
Data VersionDVC, LakeFSVersion datasets

MLflow vs W&B (Weights & Biases)

AspectMLflowW&B
CostFree, open-sourceFree tier + paid
HostingSelf-hostedCloud (easier)
UIGoodExcellent
CollaborationBasicStrong (teams)
Model RegistryBuilt-inBuilt-in
Best forEnterprise, on-premResearch, startups

📝 Quiz

  1. MLOps Level 2 bao gồm gì?

    • Chỉ experiment tracking
    • Chỉ automated training
    • Automated training + automated deployment + monitoring
    • Manual everything
  2. MLflow Model Registry dùng để?

    • Version control cho models, transition staging/production
    • Chỉ lưu metrics
    • Thay thế Git
    • Chỉ deploy models
  3. CI/CD cho ML khác traditional CI/CD ở điểm nào?

    • Không khác
    • Thêm data validation, model evaluation, threshold checks
    • Chỉ thêm unit tests
    • Không cần CI/CD cho ML

🎯 Key Takeaways

  1. MLOps — Bridge giữa ML research và production
  2. MLflow — Standard cho experiment tracking
  3. Pipeline — Automate data → train → evaluate → deploy
  4. CI/CD for ML — Test cả code, data, và model quality
  5. Model Registry — Version control cho trained models

🚀 Bài tiếp theo

Model Deployment — Deploy ML models với FastAPI, Docker, và cloud services!