MLOps Fundamentals
MLOps = Machine Learning + DevOps. Bridging the gap giữa model training và production deployment. 87% ML models never make it to production (Gartner) — MLOps solves this.
🎯 Mục tiêu
- Hiểu ML lifecycle và MLOps maturity levels
- Experiment tracking với MLflow
- ML Pipeline design
- CI/CD cho ML projects
1. ML Lifecycle
1.1 The Full Picture
ML Lifecycle
📊Data Pipeline
🏋️Model Training
🚀Deploy / Serving
📈Monitor / Retrain
Stages:
- Data Collection & Validation
- Feature Engineering & Store
- Model Training & Evaluation
- Model Validation & Testing
- Deployment & Serving
- Monitoring & Retraining
1.2 MLOps Maturity Levels
| Level | Name | Description |
|---|---|---|
| 0 | Manual | Jupyter notebooks, manual deploy |
| 1 | ML Pipeline | Automated training pipeline |
| 2 | CI/CD Pipeline | Auto-train + auto-deploy + monitoring |
| 3 | Full Automation | Auto-retrain on data drift, A/B testing |
Most teams are Level 0-1. Goal: reach Level 2.
2. Experiment Tracking with MLflow
2.1 Why Track Experiments?
Ví dụ
1Without tracking:2- "Which hyperparameters gave best result?"3- "What data version was used?"4- "Can we reproduce last month's model?"5- Notebooks scattered everywhere6 7With MLflow:8- Every run logged with params, metrics, artifacts9- Compare runs side-by-side10- One-click model deployment11- Full reproducibility2.2 Setup
Python
1# pip install mlflow2import mlflow3import mlflow.sklearn4from sklearn.ensemble import RandomForestClassifier5from sklearn.metrics import accuracy_score, f1_score6from sklearn.model_selection import train_test_split78# Start tracking server (run in terminal)9# mlflow ui --port 50002.3 Basic Experiment Tracking
Python
1# Set experiment2mlflow.set_experiment("Customer Churn Prediction")34# Prepare data5X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)67# Log experiment8with mlflow.start_run(run_name="rf_baseline"):9 # Log parameters10 params = {11 "n_estimators": 100,12 "max_depth": 10,13 "min_samples_split": 5,14 "random_state": 4215 }16 mlflow.log_params(params)17 18 # Train model19 model = RandomForestClassifier(**params)20 model.fit(X_train, y_train)21 22 # Evaluate23 y_pred = model.predict(X_test)24 accuracy = accuracy_score(y_test, y_pred)25 f1 = f1_score(y_test, y_pred, average='weighted')26 27 # Log metrics28 mlflow.log_metrics({29 "accuracy": accuracy,30 "f1_score": f1,31 "train_size": len(X_train),32 "test_size": len(X_test)33 })34 35 # Log model36 mlflow.sklearn.log_model(model, "model")37 38 # Log artifacts (plots, data info)39 mlflow.log_artifact("confusion_matrix.png")40 41 print(f"Run ID: {mlflow.active_run().info.run_id}")42 print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")2.4 Compare Experiments
Python
1# Programmatic comparison2import mlflow34experiment = mlflow.get_experiment_by_name("Customer Churn Prediction")5runs = mlflow.search_runs(6 experiment_ids=[experiment.experiment_id],7 order_by=["metrics.f1_score DESC"]8)910# Top 5 runs11print(runs[['run_id', 'params.n_estimators', 'params.max_depth',12 'metrics.accuracy', 'metrics.f1_score']].head())2.5 Model Registry
Python
1# Register best model2best_run_id = runs.iloc[0]['run_id']3model_uri = f"runs:/{best_run_id}/model"45mlflow.register_model(model_uri, "churn_prediction_model")67# Transition to production8from mlflow.tracking import MlflowClient9client = MlflowClient()1011client.transition_model_version_stage(12 name="churn_prediction_model",13 version=1,14 stage="Production"15)1617# Load production model18model = mlflow.pyfunc.load_model(19 "models:/churn_prediction_model/Production"20)21predictions = model.predict(new_data)3. ML Pipeline Design
3.1 Simple Pipeline with sklearn
Python
1from sklearn.pipeline import Pipeline2from sklearn.preprocessing import StandardScaler3from sklearn.impute import SimpleImputer4from sklearn.compose import ColumnTransformer5from sklearn.ensemble import GradientBoostingClassifier67# Define preprocessing8numeric_transformer = Pipeline([9 ('imputer', SimpleImputer(strategy='median')),10 ('scaler', StandardScaler())11])1213categorical_transformer = Pipeline([14 ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),15 ('encoder', OneHotEncoder(handle_unknown='ignore'))16])1718preprocessor = ColumnTransformer([19 ('num', numeric_transformer, numeric_features),20 ('cat', categorical_transformer, categorical_features)21])2223# Full pipeline24pipeline = Pipeline([25 ('preprocessor', preprocessor),26 ('classifier', GradientBoostingClassifier(n_estimators=100))27])2829# Train & save30pipeline.fit(X_train, y_train)31mlflow.sklearn.log_model(pipeline, "pipeline")3.2 Advanced Pipeline with Prefect
Python
1# pip install prefect2from prefect import flow, task3import pandas as pd45@task(name="load_data")6def load_data(path: str) -> pd.DataFrame:7 df = pd.read_csv(path)8 print(f"Loaded {len(df)} rows")9 return df1011@task(name="validate_data")12def validate_data(df: pd.DataFrame) -> pd.DataFrame:13 assert len(df) > 0, "Empty dataset!"14 assert df.isnull().mean().max() < 0.5, "Too many nulls!"15 16 # Check for data drift17 stats = df.describe()18 print(f"Data validation passed. Shape: {df.shape}")19 return df2021@task(name="feature_engineering")22def feature_engineering(df: pd.DataFrame) -> tuple:23 # Feature creation24 df['total_spend'] = df['quantity'] * df['price']25 df['days_since_last'] = (pd.Timestamp.now() - pd.to_datetime(df['last_purchase'])).dt.days26 27 X = df.drop('churn', axis=1)28 y = df['churn']29 return X, y3031@task(name="train_model")32def train_model(X, y):33 with mlflow.start_run():34 model = GradientBoostingClassifier(n_estimators=200)35 model.fit(X, y)36 mlflow.sklearn.log_model(model, "model")37 return model3839@flow(name="ml_training_pipeline")40def training_pipeline(data_path: str):41 df = load_data(data_path)42 df = validate_data(df)43 X, y = feature_engineering(df)44 model = train_model(X, y)45 return model4647# Run pipeline48training_pipeline("data/customers.csv")4. CI/CD for ML
4.1 ML-specific CI/CD
ML CI/CD Pipeline
📊Data Change
💻Code Change
✅Validate Data
🧪Unit Tests
🏋️Train Model
📈Evaluate
🎯Passes Threshold?
🚀Register Model → Deploy
🚫Alert Team, Block Deploy
4.2 GitHub Actions for ML
yaml
1# .github/workflows/ml-pipeline.yml2name: ML Training Pipeline3 4on:5 push:6 paths:7 - 'src/models/**'8 - 'src/features/**'9 - 'data/processed/**'10 schedule:11 - cron: '0 6 * * 1' # Weekly retraining12 13jobs:14 train:15 runs-on: ubuntu-latest16 steps:17 - uses: actions/checkout@v418 19 - name: Setup Python20 uses: actions/setup-python@v521 with:22 python-version: '3.11'23 24 - name: Install dependencies25 run: pip install -r requirements.txt26 27 - name: Run data validation28 run: python src/data/validate.py29 30 - name: Train model31 run: python src/models/train.py32 env:33 MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}34 35 - name: Evaluate model36 run: python src/models/evaluate.py37 38 - name: Check model quality39 run: |40 python -c "41 import json42 metrics = json.load(open('metrics.json'))43 assert metrics['f1_score'] > 0.85, 'F1 below threshold!'44 assert metrics['accuracy'] > 0.90, 'Accuracy below threshold!'45 print('Model quality check passed!')46 "47 48 - name: Register model49 if: success()50 run: python src/models/register.py4.3 Model Validation Checklist
Python
1def validate_model(model, X_test, y_test, thresholds):2 """Production readiness check."""3 checks = {}4 5 # 1. Performance check6 y_pred = model.predict(X_test)7 acc = accuracy_score(y_test, y_pred)8 checks['accuracy'] = acc >= thresholds['min_accuracy']9 10 # 2. Inference speed11 import time12 start = time.time()13 for _ in range(100):14 model.predict(X_test[:1])15 latency = (time.time() - start) / 10016 checks['latency_ok'] = latency < thresholds['max_latency_ms'] / 100017 18 # 3. Model size19 import joblib, os20 joblib.dump(model, '/tmp/model.pkl')21 size_mb = os.path.getsize('/tmp/model.pkl') / (1024 * 1024)22 checks['size_ok'] = size_mb < thresholds['max_size_mb']23 24 # 4. Fairness check (optional)25 # Compare performance across demographic groups26 27 all_passed = all(checks.values())28 return all_passed, checks2930# Usage31passed, checks = validate_model(model, X_test, y_test, {32 'min_accuracy': 0.90,33 'max_latency_ms': 50,34 'max_size_mb': 50035})5. Tools Landscape
| Category | Tools | Use Case |
|---|---|---|
| Experiment Tracking | MLflow, W&B, Neptune | Log params, metrics, models |
| Pipeline | Prefect, Airflow, Kubeflow | Orchestrate ML workflows |
| Feature Store | Feast, Tecton | Store & serve features |
| Model Serving | BentoML, Seldon, TorchServe | Deploy models as APIs |
| Monitoring | Evidently, NannyML | Detect drift, track performance |
| Data Version | DVC, LakeFS | Version datasets |
MLflow vs W&B (Weights & Biases)
| Aspect | MLflow | W&B |
|---|---|---|
| Cost | Free, open-source | Free tier + paid |
| Hosting | Self-hosted | Cloud (easier) |
| UI | Good | Excellent |
| Collaboration | Basic | Strong (teams) |
| Model Registry | Built-in | Built-in |
| Best for | Enterprise, on-prem | Research, startups |
📝 Quiz
-
MLOps Level 2 bao gồm gì?
- Chỉ experiment tracking
- Chỉ automated training
- Automated training + automated deployment + monitoring
- Manual everything
-
MLflow Model Registry dùng để?
- Version control cho models, transition staging/production
- Chỉ lưu metrics
- Thay thế Git
- Chỉ deploy models
-
CI/CD cho ML khác traditional CI/CD ở điểm nào?
- Không khác
- Thêm data validation, model evaluation, threshold checks
- Chỉ thêm unit tests
- Không cần CI/CD cho ML
🎯 Key Takeaways
- MLOps — Bridge giữa ML research và production
- MLflow — Standard cho experiment tracking
- Pipeline — Automate data → train → evaluate → deploy
- CI/CD for ML — Test cả code, data, và model quality
- Model Registry — Version control cho trained models
🚀 Bài tiếp theo
Model Deployment — Deploy ML models với FastAPI, Docker, và cloud services!
