mlops

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

MLOps

MLOps

Production machine learning systems with MLflow, model versioning, and deployment pipelines.
基于MLflow、模型版本管理和部署流水线的机器学习生产系统。

Quick Start

快速开始

python
import mlflow
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
python
import mlflow
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib

Configure MLflow

Configure MLflow

mlflow.set_tracking_uri("http://mlflow-server:5000") mlflow.set_experiment("customer-churn-prediction")
mlflow.set_tracking_uri("http://mlflow-server:5000") mlflow.set_experiment("customer-churn-prediction")

Training with experiment tracking

Training with experiment tracking

with mlflow.start_run(run_name="rf-baseline"): # Log parameters params = {"n_estimators": 100, "max_depth": 10, "random_state": 42} mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)

# Evaluate and log metrics
y_pred = model.predict(X_test)
metrics = {
    "accuracy": accuracy_score(y_test, y_pred),
    "f1_score": f1_score(y_test, y_pred, average="weighted")
}
mlflow.log_metrics(metrics)

# Log model to registry
mlflow.sklearn.log_model(
    model, "model",
    registered_model_name="churn-classifier",
    signature=mlflow.models.infer_signature(X_train, y_pred)
)

print(f"Run ID: {mlflow.active_run().info.run_id}")
undefined
with mlflow.start_run(run_name="rf-baseline"): # Log parameters params = {"n_estimators": 100, "max_depth": 10, "random_state": 42} mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)

# Evaluate and log metrics
y_pred = model.predict(X_test)
metrics = {
    "accuracy": accuracy_score(y_test, y_pred),
    "f1_score": f1_score(y_test, y_pred, average="weighted")
}
mlflow.log_metrics(metrics)

# Log model to registry
mlflow.sklearn.log_model(
    model, "model",
    registered_model_name="churn-classifier",
    signature=mlflow.models.infer_signature(X_train, y_pred)
)

print(f"Run ID: {mlflow.active_run().info.run_id}")
undefined

Core Concepts

核心概念

1. Model Registry & Versioning

1. 模型注册中心与版本管理

python
from mlflow.tracking import MlflowClient

client = MlflowClient()
python
from mlflow.tracking import MlflowClient

client = MlflowClient()

Promote model to production

Promote model to production

client.transition_model_version_stage( name="churn-classifier", version=3, stage="Production" )
client.transition_model_version_stage( name="churn-classifier", version=3, stage="Production" )

Archive old version

Archive old version

client.transition_model_version_stage( name="churn-classifier", version=2, stage="Archived" )
client.transition_model_version_stage( name="churn-classifier", version=2, stage="Archived" )

Load production model

Load production model

model_uri = "models:/churn-classifier/Production" model = mlflow.sklearn.load_model(model_uri)
model_uri = "models:/churn-classifier/Production" model = mlflow.sklearn.load_model(model_uri)

Model comparison

Model comparison

def compare_model_versions(model_name: str, versions: list[int]) -> dict: results = {} for version in versions: run_id = client.get_model_version(model_name, str(version)).run_id run = client.get_run(run_id) results[version] = run.data.metrics return results
undefined
def compare_model_versions(model_name: str, versions: list[int]) -> dict: results = {} for version in versions: run_id = client.get_model_version(model_name, str(version)).run_id run = client.get_run(run_id) results[version] = run.data.metrics return results
undefined

2. Feature Store Pattern

2. 特征存储模式

python
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from datetime import timedelta
python
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from datetime import timedelta

Define feature store

Define feature store

store = FeatureStore(repo_path="feature_repo/")
store = FeatureStore(repo_path="feature_repo/")

Get training features

Get training features

training_df = store.get_historical_features( entity_df=entity_df, features=[ "customer_features:total_purchases", "customer_features:days_since_last_order", "customer_features:avg_order_value" ] ).to_df()
training_df = store.get_historical_features( entity_df=entity_df, features=[ "customer_features:total_purchases", "customer_features:days_since_last_order", "customer_features:avg_order_value" ] ).to_df()

Get online features for inference

Get online features for inference

feature_vector = store.get_online_features( features=[ "customer_features:total_purchases", "customer_features:days_since_last_order" ], entity_rows=[{"customer_id": "12345"}] ).to_dict()
undefined
feature_vector = store.get_online_features( features=[ "customer_features:total_purchases", "customer_features:days_since_last_order" ], entity_rows=[{"customer_id": "12345"}] ).to_dict()
undefined

3. Model Serving with FastAPI

3. 基于FastAPI的模型服务

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np

app = FastAPI()
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np

app = FastAPI()

Load model at startup

Load model at startup

model = mlflow.sklearn.load_model("models:/churn-classifier/Production")
class PredictionRequest(BaseModel): features: list[float]
class PredictionResponse(BaseModel): prediction: int probability: float model_version: str
@app.post("/predict", response_model=PredictionResponse) async def predict(request: PredictionRequest): try: X = np.array(request.features).reshape(1, -1) prediction = model.predict(X)[0] probability = model.predict_proba(X)[0].max()
    return PredictionResponse(
        prediction=int(prediction),
        probability=float(probability),
        model_version="v3"
    )
except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))
@app.get("/health") async def health(): return {"status": "healthy", "model_loaded": model is not None}
undefined
model = mlflow.sklearn.load_model("models:/churn-classifier/Production")
class PredictionRequest(BaseModel): features: list[float]
class PredictionResponse(BaseModel): prediction: int probability: float model_version: str
@app.post("/predict", response_model=PredictionResponse) async def predict(request: PredictionRequest): try: X = np.array(request.features).reshape(1, -1) prediction = model.predict(X)[0] probability = model.predict_proba(X)[0].max()
    return PredictionResponse(
        prediction=int(prediction),
        probability=float(probability),
        model_version="v3"
    )
except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))
@app.get("/health") async def health(): return {"status": "healthy", "model_loaded": model is not None}
undefined

4. CI/CD for ML

4. 机器学习CI/CD

yaml
undefined
yaml
undefined

.github/workflows/ml-pipeline.yml

.github/workflows/ml-pipeline.yml

name: ML Pipeline
on: push: paths: - 'src/' - 'data/'
jobs: train-and-evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4
  - name: Setup Python
    uses: actions/setup-python@v5
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: pip install -r requirements.txt

  - name: Run tests
    run: pytest tests/

  - name: Train model
    env:
      MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
    run: python src/train.py

  - name: Evaluate model
    run: python src/evaluate.py --threshold 0.85

  - name: Register model
    if: success()
    run: python src/register_model.py
deploy: needs: train-and-evaluate runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - name: Deploy to production run: | kubectl set image deployment/model-server
model-server=gcr.io/$PROJECT/model:${{ github.sha }}
undefined
name: ML Pipeline
on: push: paths: - 'src/' - 'data/'
jobs: train-and-evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4
  - name: Setup Python
    uses: actions/setup-python@v5
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: pip install -r requirements.txt

  - name: Run tests
    run: pytest tests/

  - name: Train model
    env:
      MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
    run: python src/train.py

  - name: Evaluate model
    run: python src/evaluate.py --threshold 0.85

  - name: Register model
    if: success()
    run: python src/register_model.py
deploy: needs: train-and-evaluate runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - name: Deploy to production run: | kubectl set image deployment/model-server
model-server=gcr.io/$PROJECT/model:${{ github.sha }}
undefined

Tools & Technologies

工具与技术

ToolPurposeVersion (2025)
MLflowExperiment tracking2.10+
FeastFeature store0.36+
BentoMLModel serving1.2+
SeldonK8s model serving1.17+
DVCData versioning3.40+
Weights & BiasesExperiment trackingLatest
EvidentlyModel monitoring0.4+
工具用途版本(2025年)
MLflow实验跟踪2.10+
Feast特征存储0.36+
BentoML模型服务1.2+
SeldonK8s模型服务1.17+
DVC数据版本管理3.40+
Weights & Biases实验跟踪最新版
Evidently模型监控0.4+

Troubleshooting Guide

故障排除指南

IssueSymptomsRoot CauseFix
Model DriftAccuracy dropsData distribution changeMonitor, retrain
Slow InferenceHigh latencyLarge model, no optimizationQuantize, distill
Version MismatchPrediction errorsWrong model versionPin versions
Feature SkewTrain/serve mismatchDifferent preprocessingUse feature store
问题症状根本原因解决方法
模型漂移准确率下降数据分布变化监控、重新训练
推理缓慢高延迟模型过大、未优化量化、蒸馏
版本不匹配预测错误模型版本错误固定版本
特征偏移训练/服务数据不匹配预处理流程不同使用特征存储

Best Practices

最佳实践

python
undefined
python
undefined

✅ DO: Version everything

✅ DO: Version everything

mlflow.log_artifact("data/train.csv") mlflow.log_params({"data_version": "v2.3"})
mlflow.log_artifact("data/train.csv") mlflow.log_params({"data_version": "v2.3"})

✅ DO: Test model before deployment

✅ DO: Test model before deployment

def test_model_performance(model, threshold=0.85): score = evaluate_model(model) assert score >= threshold, f"Model score {score} below threshold"
def test_model_performance(model, threshold=0.85): score = evaluate_model(model) assert score >= threshold, f"Model score {score} below threshold"

✅ DO: Monitor in production

✅ DO: Monitor in production

✅ DO: A/B test new models

✅ DO: A/B test new models

❌ DON'T: Deploy without validation

❌ DON'T: Deploy without validation

❌ DON'T: Skip rollback strategy

❌ DON'T: Skip rollback strategy

undefined
undefined

Resources

资源


Skill Certification Checklist:
  • Can track experiments with MLflow
  • Can manage model registry
  • Can deploy models with FastAPI/BentoML
  • Can set up CI/CD for ML
  • Can monitor models in production

技能认证检查清单:
  • 能够使用MLflow跟踪实验
  • 能够管理模型注册中心
  • 能够使用FastAPI/BentoML部署模型
  • 能够搭建机器学习CI/CD流程
  • 能够在生产环境监控模型