mlops
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMLOps
MLOps
Production machine learning systems with MLflow, model versioning, and deployment pipelines.
基于MLflow、模型版本管理和部署流水线的机器学习生产系统。
Quick Start
快速开始
python
import mlflow
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblibpython
import mlflow
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblibConfigure MLflow
Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer-churn-prediction")
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer-churn-prediction")
Training with experiment tracking
Training with experiment tracking
with mlflow.start_run(run_name="rf-baseline"):
# Log parameters
params = {"n_estimators": 100, "max_depth": 10, "random_state": 42}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate and log metrics
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted")
}
mlflow.log_metrics(metrics)
# Log model to registry
mlflow.sklearn.log_model(
model, "model",
registered_model_name="churn-classifier",
signature=mlflow.models.infer_signature(X_train, y_pred)
)
print(f"Run ID: {mlflow.active_run().info.run_id}")undefinedwith mlflow.start_run(run_name="rf-baseline"):
# Log parameters
params = {"n_estimators": 100, "max_depth": 10, "random_state": 42}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate and log metrics
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted")
}
mlflow.log_metrics(metrics)
# Log model to registry
mlflow.sklearn.log_model(
model, "model",
registered_model_name="churn-classifier",
signature=mlflow.models.infer_signature(X_train, y_pred)
)
print(f"Run ID: {mlflow.active_run().info.run_id}")undefinedCore Concepts
核心概念
1. Model Registry & Versioning
1. 模型注册中心与版本管理
python
from mlflow.tracking import MlflowClient
client = MlflowClient()python
from mlflow.tracking import MlflowClient
client = MlflowClient()Promote model to production
Promote model to production
client.transition_model_version_stage(
name="churn-classifier",
version=3,
stage="Production"
)
client.transition_model_version_stage(
name="churn-classifier",
version=3,
stage="Production"
)
Archive old version
Archive old version
client.transition_model_version_stage(
name="churn-classifier",
version=2,
stage="Archived"
)
client.transition_model_version_stage(
name="churn-classifier",
version=2,
stage="Archived"
)
Load production model
Load production model
model_uri = "models:/churn-classifier/Production"
model = mlflow.sklearn.load_model(model_uri)
model_uri = "models:/churn-classifier/Production"
model = mlflow.sklearn.load_model(model_uri)
Model comparison
Model comparison
def compare_model_versions(model_name: str, versions: list[int]) -> dict:
results = {}
for version in versions:
run_id = client.get_model_version(model_name, str(version)).run_id
run = client.get_run(run_id)
results[version] = run.data.metrics
return results
undefineddef compare_model_versions(model_name: str, versions: list[int]) -> dict:
results = {}
for version in versions:
run_id = client.get_model_version(model_name, str(version)).run_id
run = client.get_run(run_id)
results[version] = run.data.metrics
return results
undefined2. Feature Store Pattern
2. 特征存储模式
python
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from datetime import timedeltapython
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from datetime import timedeltaDefine feature store
Define feature store
store = FeatureStore(repo_path="feature_repo/")
store = FeatureStore(repo_path="feature_repo/")
Get training features
Get training features
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"customer_features:total_purchases",
"customer_features:days_since_last_order",
"customer_features:avg_order_value"
]
).to_df()
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"customer_features:total_purchases",
"customer_features:days_since_last_order",
"customer_features:avg_order_value"
]
).to_df()
Get online features for inference
Get online features for inference
feature_vector = store.get_online_features(
features=[
"customer_features:total_purchases",
"customer_features:days_since_last_order"
],
entity_rows=[{"customer_id": "12345"}]
).to_dict()
undefinedfeature_vector = store.get_online_features(
features=[
"customer_features:total_purchases",
"customer_features:days_since_last_order"
],
entity_rows=[{"customer_id": "12345"}]
).to_dict()
undefined3. Model Serving with FastAPI
3. 基于FastAPI的模型服务
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np
app = FastAPI()python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np
app = FastAPI()Load model at startup
Load model at startup
model = mlflow.sklearn.load_model("models:/churn-classifier/Production")
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: int
probability: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
X = np.array(request.features).reshape(1, -1)
prediction = model.predict(X)[0]
probability = model.predict_proba(X)[0].max()
return PredictionResponse(
prediction=int(prediction),
probability=float(probability),
model_version="v3"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
undefinedmodel = mlflow.sklearn.load_model("models:/churn-classifier/Production")
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: int
probability: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
X = np.array(request.features).reshape(1, -1)
prediction = model.predict(X)[0]
probability = model.predict_proba(X)[0].max()
return PredictionResponse(
prediction=int(prediction),
probability=float(probability),
model_version="v3"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
undefined4. CI/CD for ML
4. 机器学习CI/CD
yaml
undefinedyaml
undefined.github/workflows/ml-pipeline.yml
.github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
paths:
- 'src/'
- 'data/'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run tests
run: pytest tests/
- name: Train model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
run: python src/train.py
- name: Evaluate model
run: python src/evaluate.py --threshold 0.85
- name: Register model
if: success()
run: python src/register_model.pydeploy:
needs: train-and-evaluate
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
run: |
kubectl set image deployment/model-server
model-server=gcr.io/$PROJECT/model:${{ github.sha }}
model-server=gcr.io/$PROJECT/model:${{ github.sha }}
undefinedname: ML Pipeline
on:
push:
paths:
- 'src/'
- 'data/'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run tests
run: pytest tests/
- name: Train model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
run: python src/train.py
- name: Evaluate model
run: python src/evaluate.py --threshold 0.85
- name: Register model
if: success()
run: python src/register_model.pydeploy:
needs: train-and-evaluate
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
run: |
kubectl set image deployment/model-server
model-server=gcr.io/$PROJECT/model:${{ github.sha }}
model-server=gcr.io/$PROJECT/model:${{ github.sha }}
undefinedTools & Technologies
工具与技术
| Tool | Purpose | Version (2025) |
|---|---|---|
| MLflow | Experiment tracking | 2.10+ |
| Feast | Feature store | 0.36+ |
| BentoML | Model serving | 1.2+ |
| Seldon | K8s model serving | 1.17+ |
| DVC | Data versioning | 3.40+ |
| Weights & Biases | Experiment tracking | Latest |
| Evidently | Model monitoring | 0.4+ |
| 工具 | 用途 | 版本(2025年) |
|---|---|---|
| MLflow | 实验跟踪 | 2.10+ |
| Feast | 特征存储 | 0.36+ |
| BentoML | 模型服务 | 1.2+ |
| Seldon | K8s模型服务 | 1.17+ |
| DVC | 数据版本管理 | 3.40+ |
| Weights & Biases | 实验跟踪 | 最新版 |
| Evidently | 模型监控 | 0.4+ |
Troubleshooting Guide
故障排除指南
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Model Drift | Accuracy drops | Data distribution change | Monitor, retrain |
| Slow Inference | High latency | Large model, no optimization | Quantize, distill |
| Version Mismatch | Prediction errors | Wrong model version | Pin versions |
| Feature Skew | Train/serve mismatch | Different preprocessing | Use feature store |
| 问题 | 症状 | 根本原因 | 解决方法 |
|---|---|---|---|
| 模型漂移 | 准确率下降 | 数据分布变化 | 监控、重新训练 |
| 推理缓慢 | 高延迟 | 模型过大、未优化 | 量化、蒸馏 |
| 版本不匹配 | 预测错误 | 模型版本错误 | 固定版本 |
| 特征偏移 | 训练/服务数据不匹配 | 预处理流程不同 | 使用特征存储 |
Best Practices
最佳实践
python
undefinedpython
undefined✅ DO: Version everything
✅ DO: Version everything
mlflow.log_artifact("data/train.csv")
mlflow.log_params({"data_version": "v2.3"})
mlflow.log_artifact("data/train.csv")
mlflow.log_params({"data_version": "v2.3"})
✅ DO: Test model before deployment
✅ DO: Test model before deployment
def test_model_performance(model, threshold=0.85):
score = evaluate_model(model)
assert score >= threshold, f"Model score {score} below threshold"
def test_model_performance(model, threshold=0.85):
score = evaluate_model(model)
assert score >= threshold, f"Model score {score} below threshold"
✅ DO: Monitor in production
✅ DO: Monitor in production
✅ DO: A/B test new models
✅ DO: A/B test new models
❌ DON'T: Deploy without validation
❌ DON'T: Deploy without validation
❌ DON'T: Skip rollback strategy
❌ DON'T: Skip rollback strategy
undefinedundefinedResources
资源
Skill Certification Checklist:
- Can track experiments with MLflow
- Can manage model registry
- Can deploy models with FastAPI/BentoML
- Can set up CI/CD for ML
- Can monitor models in production
技能认证检查清单:
- 能够使用MLflow跟踪实验
- 能够管理模型注册中心
- 能够使用FastAPI/BentoML部署模型
- 能够搭建机器学习CI/CD流程
- 能够在生产环境监控模型