mlops-workflows
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMLOps Workflows with MLflow
基于MLflow的MLOps工作流
A comprehensive guide to production-grade MLOps workflows covering the complete machine learning lifecycle from experimentation to production deployment and monitoring.
本指南全面介绍了生产级MLOps工作流,涵盖从实验到生产部署及监控的完整机器学习生命周期。
Table of Contents
目录
MLflow Components Overview
MLflow组件概览
MLflow consists of four primary components for managing the ML lifecycle:
MLflow包含四个用于管理机器学习生命周期的核心组件:
1. MLflow Tracking
1. MLflow Tracking
Track experiments, parameters, metrics, and artifacts during model development.
python
import mlflow在模型开发过程中跟踪实验、参数、指标及工件。
python
import mlflowSet tracking URI
Set tracking URI
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_tracking_uri("http://localhost:5000")
Create or set experiment
Create or set experiment
mlflow.set_experiment("production-models")
mlflow.set_experiment("production-models")
Start a run
Start a run
with mlflow.start_run(run_name="baseline-model"):
# Log parameters
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("batch_size", 32)
# Log metrics
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("loss", 0.05)
# Log artifacts
mlflow.log_artifact("model_plot.png")undefinedwith mlflow.start_run(run_name="baseline-model"):
# Log parameters
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("batch_size", 32)
# Log metrics
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("loss", 0.05)
# Log artifacts
mlflow.log_artifact("model_plot.png")undefined2. MLflow Projects
2. MLflow Projects
Package ML code in a reusable, reproducible format.
yaml
undefined以可复用、可复现的格式打包机器学习代码。
yaml
undefinedMLproject file
MLproject file
name: my-ml-project
conda_env: conda.yaml
entry_points:
main:
parameters:
learning_rate: {type: float, default: 0.01}
epochs: {type: int, default: 100}
command: "python train.py --lr {learning_rate} --epochs {epochs}"
evaluate:
parameters:
model_uri: {type: string}
command: "python evaluate.py --model-uri {model_uri}"
undefinedname: my-ml-project
conda_env: conda.yaml
entry_points:
main:
parameters:
learning_rate: {type: float, default: 0.01}
epochs: {type: int, default: 100}
command: "python train.py --lr {learning_rate} --epochs {epochs}"
evaluate:
parameters:
model_uri: {type: string}
command: "python evaluate.py --model-uri {model_uri}"
undefined3. MLflow Models
3. MLflow Models
Package models in a standard format for deployment across platforms.
python
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier以标准格式打包模型,实现跨平台部署。
python
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifierTrain model
Train model
model = RandomForestClassifier()
model.fit(X_train, y_train)
model = RandomForestClassifier()
model.fit(X_train, y_train)
Log model with signature
Log model with signature
from mlflow.models import infer_signature
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(
sk_model=model,
name="random-forest-model",
signature=signature,
input_example=X_train[:5],
registered_model_name="ProductionClassifier"
)
undefinedfrom mlflow.models import infer_signature
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(
sk_model=model,
name="random-forest-model",
signature=signature,
input_example=X_train[:5],
registered_model_name="ProductionClassifier"
)
undefined4. MLflow Registry
4. MLflow Registry
Centralized model store for managing model lifecycle and versioning.
python
from mlflow import MlflowClient
client = MlflowClient()用于管理模型生命周期和版本控制的中心化模型仓库。
python
from mlflow import MlflowClient
client = MlflowClient()Register model
Register model
model_uri = f"runs:/{run_id}/model"
registered_model = mlflow.register_model(
model_uri=model_uri,
name="CustomerChurnModel"
)
model_uri = f"runs:/{run_id}/model"
registered_model = mlflow.register_model(
model_uri=model_uri,
name="CustomerChurnModel"
)
Set model alias for deployment
Set model alias for deployment
client.set_registered_model_alias(
name="CustomerChurnModel",
alias="production",
version=registered_model.version
)
undefinedclient.set_registered_model_alias(
name="CustomerChurnModel",
alias="production",
version=registered_model.version
)
undefinedExperiment Tracking
实验跟踪
Basic Experiment Tracking
基础实验跟踪
python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_splitpython
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_splitConfigure MLflow
Configure MLflow
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("house-price-prediction")
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("house-price-prediction")
Load and prepare data
Load and prepare data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
Training with MLflow tracking
Training with MLflow tracking
with mlflow.start_run(run_name="rf-baseline"):
# Define parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
"random_state": 42
}
# Train model
model = RandomForestRegressor(**params)
model.fit(X_train, y_train)
# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)
# Log everything
mlflow.log_params(params)
mlflow.log_metrics({
"mse": mse,
"r2": r2,
"rmse": mse ** 0.5
})
# Log model
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="HousePricePredictor"
)undefinedwith mlflow.start_run(run_name="rf-baseline"):
# Define parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
"random_state": 42
}
# Train model
model = RandomForestRegressor(**params)
model.fit(X_train, y_train)
# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)
# Log everything
mlflow.log_params(params)
mlflow.log_metrics({
"mse": mse,
"r2": r2,
"rmse": mse ** 0.5
})
# Log model
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="HousePricePredictor"
)undefinedAutologging
自动日志记录
MLflow provides automatic logging for popular frameworks:
python
import mlflow
from sklearn.ensemble import RandomForestClassifierMLflow为主流框架提供自动日志记录功能:
python
import mlflow
from sklearn.ensemble import RandomForestClassifierEnable autologging for scikit-learn
Enable autologging for scikit-learn
mlflow.sklearn.autolog()
mlflow.sklearn.autolog()
Your training code - everything is logged automatically
Your training code - everything is logged automatically
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100, max_depth=5)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
undefinedwith mlflow.start_run():
model = RandomForestClassifier(n_estimators=100, max_depth=5)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
undefinedNested Runs for Hyperparameter Tuning
用于超参数调优的嵌套运行
python
import mlflow
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier
mlflow.set_experiment("hyperparameter-tuning")python
import mlflow
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier
mlflow.set_experiment("hyperparameter-tuning")Parent run for the entire tuning process
Parent run for the entire tuning process
with mlflow.start_run(run_name="grid-search-parent"):
param_grid = {
'learning_rate': [0.01, 0.1, 0.3],
'n_estimators': [50, 100, 200],
'max_depth': [3, 5, 7]
}
# Log parent parameters
mlflow.log_param("tuning_method", "grid_search")
mlflow.log_param("cv_folds", 5)
best_score = 0
best_params = None
# Nested runs for each parameter combination
for lr in param_grid['learning_rate']:
for n_est in param_grid['n_estimators']:
for depth in param_grid['max_depth']:
with mlflow.start_run(nested=True, run_name=f"lr{lr}_n{n_est}_d{depth}"):
params = {
'learning_rate': lr,
'n_estimators': n_est,
'max_depth': depth
}
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
mlflow.log_params(params)
mlflow.log_metric("accuracy", score)
if score > best_score:
best_score = score
best_params = params
# Log best results in parent run
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_accuracy", best_score)undefinedwith mlflow.start_run(run_name="grid-search-parent"):
param_grid = {
'learning_rate': [0.01, 0.1, 0.3],
'n_estimators': [50, 100, 200],
'max_depth': [3, 5, 7]
}
# Log parent parameters
mlflow.log_param("tuning_method", "grid_search")
mlflow.log_param("cv_folds", 5)
best_score = 0
best_params = None
# Nested runs for each parameter combination
for lr in param_grid['learning_rate']:
for n_est in param_grid['n_estimators']:
for depth in param_grid['max_depth']:
with mlflow.start_run(nested=True, run_name=f"lr{lr}_n{n_est}_d{depth}"):
params = {
'learning_rate': lr,
'n_estimators': n_est,
'max_depth': depth
}
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
mlflow.log_params(params)
mlflow.log_metric("accuracy", score)
if score > best_score:
best_score = score
best_params = params
# Log best results in parent run
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_accuracy", best_score)undefinedTracking Multiple Metrics Over Time
随时间跟踪多指标
python
import mlflow
import numpy as np
with mlflow.start_run():
# Log metrics at different steps (epochs)
for epoch in range(100):
train_loss = np.random.random() * (1 - epoch/100)
val_loss = np.random.random() * (1 - epoch/100) + 0.1
mlflow.log_metric("train_loss", train_loss, step=epoch)
mlflow.log_metric("val_loss", val_loss, step=epoch)
mlflow.log_metric("learning_rate", 0.01 * (0.95 ** epoch), step=epoch)python
import mlflow
import numpy as np
with mlflow.start_run():
# Log metrics at different steps (epochs)
for epoch in range(100):
train_loss = np.random.random() * (1 - epoch/100)
val_loss = np.random.random() * (1 - epoch/100) + 0.1
mlflow.log_metric("train_loss", train_loss, step=epoch)
mlflow.log_metric("val_loss", val_loss, step=epoch)
mlflow.log_metric("learning_rate", 0.01 * (0.95 ** epoch), step=epoch)Logging Artifacts
日志记录工件
python
import mlflow
import matplotlib.pyplot as plt
import pandas as pd
with mlflow.start_run():
# Log plot
plt.figure(figsize=(10, 6))
plt.plot(history['loss'], label='Training Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.legend()
plt.savefig("loss_curve.png")
mlflow.log_artifact("loss_curve.png")
# Log dataframe as CSV
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': model.feature_importances_
})
feature_importance.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
# Log entire directory
mlflow.log_artifacts("output_dir/", artifact_path="outputs")python
import mlflow
import matplotlib.pyplot as plt
import pandas as pd
with mlflow.start_run():
# Log plot
plt.figure(figsize=(10, 6))
plt.plot(history['loss'], label='Training Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.legend()
plt.savefig("loss_curve.png")
mlflow.log_artifact("loss_curve.png")
# Log dataframe as CSV
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': model.feature_importances_
})
feature_importance.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
# Log entire directory
mlflow.log_artifacts("output_dir/", artifact_path="outputs")Model Registry
模型注册
Registering Models
注册模型
python
from mlflow import MlflowClient
import mlflow.sklearn
client = MlflowClient()python
from mlflow import MlflowClient
import mlflow.sklearn
client = MlflowClient()Method 1: Register during model logging
Method 1: Register during model logging
with mlflow.start_run():
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="CustomerSegmentationModel"
)
with mlflow.start_run():
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="CustomerSegmentationModel"
)
Method 2: Register an existing model
Method 2: Register an existing model
run_id = "abc123"
model_uri = f"runs:/{run_id}/model"
registered_model = mlflow.register_model(
model_uri=model_uri,
name="CustomerSegmentationModel"
)
undefinedrun_id = "abc123"
model_uri = f"runs:/{run_id}/model"
registered_model = mlflow.register_model(
model_uri=model_uri,
name="CustomerSegmentationModel"
)
undefinedModel Versioning and Aliases
模型版本控制与别名
python
from mlflow import MlflowClient
client = MlflowClient()python
from mlflow import MlflowClient
client = MlflowClient()Create registered model
Create registered model
client.create_registered_model(
name="FraudDetectionModel",
description="ML model for detecting fraudulent transactions"
)
client.create_registered_model(
name="FraudDetectionModel",
description="ML model for detecting fraudulent transactions"
)
Register version 1
Register version 1
model_uri_v1 = "runs:/run1/model"
mv1 = client.create_model_version(
name="FraudDetectionModel",
source=model_uri_v1,
run_id="run1"
)
model_uri_v1 = "runs:/run1/model"
mv1 = client.create_model_version(
name="FraudDetectionModel",
source=model_uri_v1,
run_id="run1"
)
Set aliases for deployment management
Set aliases for deployment management
client.set_registered_model_alias(
name="FraudDetectionModel",
alias="champion", # Production model
version="1"
)
client.set_registered_model_alias(
name="FraudDetectionModel",
alias="challenger", # A/B testing model
version="2"
)
client.set_registered_model_alias(
name="FraudDetectionModel",
alias="champion", # Production model
version="1"
)
client.set_registered_model_alias(
name="FraudDetectionModel",
alias="challenger", # A/B testing model
version="2"
)
Load model by alias
Load model by alias
champion_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@champion")
challenger_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@challenger")
undefinedchampion_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@champion")
challenger_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@challenger")
undefinedModel Lifecycle Management
模型生命周期管理
python
from mlflow import MlflowClient
from mlflow.entities import LoggedModelStatus
client = MlflowClient()python
from mlflow import MlflowClient
from mlflow.entities import LoggedModelStatus
client = MlflowClient()Initialize model in PENDING state
Initialize model in PENDING state
model = mlflow.initialize_logged_model(
name="neural_network_classifier",
model_type="neural_network",
tags={"architecture": "resnet", "dataset": "imagenet"}
)
try:
# Training and validation
train_model()
validate_model()
# Log model artifacts
mlflow.pytorch.log_model(
pytorch_model=model_instance,
name="model",
model_id=model.model_id
)
# Mark as ready
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.READY)except Exception as e:
# Mark as failed
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.FAILED)
raise
undefinedmodel = mlflow.initialize_logged_model(
name="neural_network_classifier",
model_type="neural_network",
tags={"architecture": "resnet", "dataset": "imagenet"}
)
try:
# Training and validation
train_model()
validate_model()
# Log model artifacts
mlflow.pytorch.log_model(
pytorch_model=model_instance,
name="model",
model_id=model.model_id
)
# Mark as ready
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.READY)except Exception as e:
# Mark as failed
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.FAILED)
raise
undefinedModel Metadata and Tags
模型元数据与标签
python
from mlflow import MlflowClient
client = MlflowClient()python
from mlflow import MlflowClient
client = MlflowClient()Set registered model tags
Set registered model tags
client.set_registered_model_tag(
name="RecommendationModel",
key="task",
value="collaborative_filtering"
)
client.set_registered_model_tag(
name="RecommendationModel",
key="business_unit",
value="ecommerce"
)
client.set_registered_model_tag(
name="RecommendationModel",
key="task",
value="collaborative_filtering"
)
client.set_registered_model_tag(
name="RecommendationModel",
key="business_unit",
value="ecommerce"
)
Set model version tags
Set model version tags
client.set_model_version_tag(
name="RecommendationModel",
version="3",
key="validation_status",
value="approved"
)
client.set_model_version_tag(
name="RecommendationModel",
version="3",
key="approval_date",
value="2024-01-15"
)
client.set_model_version_tag(
name="RecommendationModel",
version="3",
key="validation_status",
value="approved"
)
client.set_model_version_tag(
name="RecommendationModel",
version="3",
key="approval_date",
value="2024-01-15"
)
Update model description
Update model description
client.update_registered_model(
name="RecommendationModel",
description="Collaborative filtering model for product recommendations. Trained on user-item interaction data."
)
undefinedclient.update_registered_model(
name="RecommendationModel",
description="Collaborative filtering model for product recommendations. Trained on user-item interaction data."
)
undefinedSearching and Filtering Models
搜索与过滤模型
python
from mlflow import MlflowClient
client = MlflowClient()python
from mlflow import MlflowClient
client = MlflowClient()Search registered models
Search registered models
models = client.search_registered_models(
filter_string="name LIKE 'Production%'",
max_results=10
)
models = client.search_registered_models(
filter_string="name LIKE 'Production%'",
max_results=10
)
Search model versions
Search model versions
versions = client.search_model_versions(
filter_string="name='CustomerChurnModel' AND tags.validation_status='approved'"
)
versions = client.search_model_versions(
filter_string="name='CustomerChurnModel' AND tags.validation_status='approved'"
)
Get specific model version
Get specific model version
model_version = client.get_model_version(
name="CustomerChurnModel",
version="5"
)
model_version = client.get_model_version(
name="CustomerChurnModel",
version="5"
)
Get model by alias
Get model by alias
champion = client.get_model_version_by_alias(
name="CustomerChurnModel",
alias="champion"
)
undefinedchampion = client.get_model_version_by_alias(
name="CustomerChurnModel",
alias="champion"
)
undefinedDeployment Patterns
部署模式
Local Model Serving
本地模型服务
python
import mlflow.pyfuncpython
import mlflow.pyfuncLoad model
Load model
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
Make predictions
Make predictions
predictions = model.predict(data)
undefinedpredictions = model.predict(data)
undefinedREST API Deployment
REST API部署
bash
undefinedbash
undefinedServe model as REST API
Serve model as REST API
mlflow models serve
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4
```pythonmlflow models serve
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4
```pythonClient code to call the REST API
Client code to call the REST API
import requests
import json
url = "http://localhost:5001/invocations"
headers = {"Content-Type": "application/json"}
data = {
"dataframe_split": {
"columns": ["feature1", "feature2", "feature3"],
"data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
}
}
response = requests.post(url, headers=headers, data=json.dumps(data))
predictions = response.json()
undefinedimport requests
import json
url = "http://localhost:5001/invocations"
headers = {"Content-Type": "application/json"}
data = {
"dataframe_split": {
"columns": ["feature1", "feature2", "feature3"],
"data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
}
}
response = requests.post(url, headers=headers, data=json.dumps(data))
predictions = response.json()
undefinedDocker Deployment
Docker部署
bash
undefinedbash
undefinedBuild Docker image
Build Docker image
mlflow models build-docker
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model
mlflow models build-docker
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model
Run container
Run container
docker run -p 8080:8080 customer-churn-model
undefineddocker run -p 8080:8080 customer-churn-model
undefinedAWS SageMaker Deployment
AWS SageMaker部署
python
import mlflow.sagemakerpython
import mlflow.sagemakerDeploy to SageMaker
Deploy to SageMaker
mlflow.sagemaker.deploy(
app_name="customer-churn-predictor",
model_uri="models:/CustomerChurnModel@production",
region_name="us-east-1",
mode="create",
execution_role_arn="arn:aws:iam::123456789:role/SageMakerRole",
instance_type="ml.m5.xlarge",
instance_count=2
)
undefinedmlflow.sagemaker.deploy(
app_name="customer-churn-predictor",
model_uri="models:/CustomerChurnModel@production",
region_name="us-east-1",
mode="create",
execution_role_arn="arn:aws:iam::123456789:role/SageMakerRole",
instance_type="ml.m5.xlarge",
instance_count=2
)
undefinedAzure ML Deployment
Azure ML部署
python
import mlflow.azureml
from azureml.core import Workspace
from azureml.core.webservice import AciWebservicepython
import mlflow.azureml
from azureml.core import Workspace
from azureml.core.webservice import AciWebserviceConfigure workspace
Configure workspace
ws = Workspace.from_config()
ws = Workspace.from_config()
Deploy to Azure Container Instance
Deploy to Azure Container Instance
aci_config = AciWebservice.deploy_configuration(
cpu_cores=2,
memory_gb=4,
tags={"model": "churn-predictor"},
description="Customer churn prediction model"
)
mlflow.azureml.deploy(
model_uri="models:/CustomerChurnModel@production",
workspace=ws,
deployment_config=aci_config,
service_name="churn-predictor-service"
)
undefinedaci_config = AciWebservice.deploy_configuration(
cpu_cores=2,
memory_gb=4,
tags={"model": "churn-predictor"},
description="Customer churn prediction model"
)
mlflow.azureml.deploy(
model_uri="models:/CustomerChurnModel@production",
workspace=ws,
deployment_config=aci_config,
service_name="churn-predictor-service"
)
undefinedGCP Vertex AI Deployment
GCP Vertex AI部署
python
from google.cloud import aiplatform
import mlflowpython
from google.cloud import aiplatform
import mlflowInitialize Vertex AI
Initialize Vertex AI
aiplatform.init(project="my-project", location="us-central1")
aiplatform.init(project="my-project", location="us-central1")
Deploy to Vertex AI
Deploy to Vertex AI
model = mlflow.register_model(
model_uri="runs:/run-id/model",
name="CustomerChurnModel"
)
model = mlflow.register_model(
model_uri="runs:/run-id/model",
name="CustomerChurnModel"
)
Create Vertex AI endpoint
Create Vertex AI endpoint
endpoint = aiplatform.Endpoint.create(display_name="churn-prediction-endpoint")
endpoint = aiplatform.Endpoint.create(display_name="churn-prediction-endpoint")
Deploy model
Deploy model
endpoint.deploy(
model=model,
deployed_model_display_name="churn-v1",
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=5
)
undefinedendpoint.deploy(
model=model,
deployed_model_display_name="churn-v1",
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=5
)
undefinedBatch Inference
批量推理
python
import mlflow
import pandas as pdpython
import mlflow
import pandas as pdLoad model
Load model
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
Load batch data
Load batch data
batch_data = pd.read_csv("customer_batch.csv")
batch_data = pd.read_csv("customer_batch.csv")
Process in chunks
Process in chunks
chunk_size = 1000
predictions = []
for i in range(0, len(batch_data), chunk_size):
chunk = batch_data[i:i+chunk_size]
chunk_predictions = model.predict(chunk)
predictions.extend(chunk_predictions)
chunk_size = 1000
predictions = []
for i in range(0, len(batch_data), chunk_size):
chunk = batch_data[i:i+chunk_size]
chunk_predictions = model.predict(chunk)
predictions.extend(chunk_predictions)
Save results
Save results
results = pd.DataFrame({
'customer_id': batch_data['customer_id'],
'churn_probability': predictions
})
results.to_csv("churn_predictions.csv", index=False)
undefinedresults = pd.DataFrame({
'customer_id': batch_data['customer_id'],
'churn_probability': predictions
})
results.to_csv("churn_predictions.csv", index=False)
undefinedMonitoring and Observability
监控与可观测性
Model Performance Monitoring
模型性能监控
python
import mlflow
from datetime import datetime
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score
class ModelMonitor:
def __init__(self, model_name, tracking_uri):
self.model_name = model_name
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(f"{model_name}-monitoring")
def log_prediction_metrics(self, y_true, y_pred, timestamp=None):
"""Log prediction metrics for monitoring"""
if timestamp is None:
timestamp = datetime.now()
with mlflow.start_run(run_name=f"monitoring-{timestamp}"):
# Calculate metrics
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted')
}
# Log metrics
mlflow.log_metrics(metrics)
mlflow.log_param("timestamp", timestamp.isoformat())
mlflow.log_param("num_predictions", len(y_pred))
# Check for drift
if metrics["accuracy"] < 0.85:
mlflow.set_tag("alert", "performance_degradation")
def log_data_drift(self, reference_data, current_data):
"""Monitor for data drift"""
with mlflow.start_run(run_name="data-drift-check"):
# Calculate distribution statistics
for col in reference_data.columns:
ref_mean = reference_data[col].mean()
curr_mean = current_data[col].mean()
drift_percent = abs((curr_mean - ref_mean) / ref_mean) * 100
mlflow.log_metric(f"{col}_drift_percent", drift_percent)
if drift_percent > 20:
mlflow.set_tag(f"{col}_drift_alert", "high")python
import mlflow
from datetime import datetime
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score
class ModelMonitor:
def __init__(self, model_name, tracking_uri):
self.model_name = model_name
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(f"{model_name}-monitoring")
def log_prediction_metrics(self, y_true, y_pred, timestamp=None):
"""Log prediction metrics for monitoring"""
if timestamp is None:
timestamp = datetime.now()
with mlflow.start_run(run_name=f"monitoring-{timestamp}"):
# Calculate metrics
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted')
}
# Log metrics
mlflow.log_metrics(metrics)
mlflow.log_param("timestamp", timestamp.isoformat())
mlflow.log_param("num_predictions", len(y_pred))
# Check for drift
if metrics["accuracy"] < 0.85:
mlflow.set_tag("alert", "performance_degradation")
def log_data_drift(self, reference_data, current_data):
"""Monitor for data drift"""
with mlflow.start_run(run_name="data-drift-check"):
# Calculate distribution statistics
for col in reference_data.columns:
ref_mean = reference_data[col].mean()
curr_mean = current_data[col].mean()
drift_percent = abs((curr_mean - ref_mean) / ref_mean) * 100
mlflow.log_metric(f"{col}_drift_percent", drift_percent)
if drift_percent > 20:
mlflow.set_tag(f"{col}_drift_alert", "high")Usage
Usage
monitor = ModelMonitor("CustomerChurnModel", "http://localhost:5000")
monitor.log_prediction_metrics(y_true, y_pred)
undefinedmonitor = ModelMonitor("CustomerChurnModel", "http://localhost:5000")
monitor.log_prediction_metrics(y_true, y_pred)
undefinedPrediction Logging
预测日志记录
python
import mlflow
from datetime import datetime
import json
def log_predictions(model_name, inputs, predictions, metadata=None):
"""Log predictions for auditing and monitoring"""
mlflow.set_experiment(f"{model_name}-predictions")
with mlflow.start_run(run_name=f"prediction-{datetime.now().isoformat()}"):
# Log prediction data
mlflow.log_param("num_predictions", len(predictions))
mlflow.log_param("model_name", model_name)
# Log metadata
if metadata:
mlflow.log_params(metadata)
# Log input/output samples
sample_data = {
"inputs": inputs[:5].tolist() if hasattr(inputs, 'tolist') else inputs[:5],
"predictions": predictions[:5].tolist() if hasattr(predictions, 'tolist') else predictions[:5]
}
with open("prediction_sample.json", "w") as f:
json.dump(sample_data, f)
mlflow.log_artifact("prediction_sample.json")python
import mlflow
from datetime import datetime
import json
def log_predictions(model_name, inputs, predictions, metadata=None):
"""Log predictions for auditing and monitoring"""
mlflow.set_experiment(f"{model_name}-predictions")
with mlflow.start_run(run_name=f"prediction-{datetime.now().isoformat()}"):
# Log prediction data
mlflow.log_param("num_predictions", len(predictions))
mlflow.log_param("model_name", model_name)
# Log metadata
if metadata:
mlflow.log_params(metadata)
# Log input/output samples
sample_data = {
"inputs": inputs[:5].tolist() if hasattr(inputs, 'tolist') else inputs[:5],
"predictions": predictions[:5].tolist() if hasattr(predictions, 'tolist') else predictions[:5]
}
with open("prediction_sample.json", "w") as f:
json.dump(sample_data, f)
mlflow.log_artifact("prediction_sample.json")Model Explainability Tracking
模型可解释性跟踪
python
import mlflow
import shap
import matplotlib.pyplot as plt
def log_model_explanations(model, X_test, feature_names):
"""Log SHAP explanations for model interpretability"""
with mlflow.start_run():
# Calculate SHAP values
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# Create summary plot
plt.figure()
shap.summary_plot(shap_values, X_test, feature_names=feature_names, show=False)
plt.savefig("shap_summary.png", bbox_inches='tight')
mlflow.log_artifact("shap_summary.png")
# Log feature importance
feature_importance = dict(zip(feature_names, model.feature_importances_))
mlflow.log_params({f"importance_{k}": v for k, v in feature_importance.items()})python
import mlflow
import shap
import matplotlib.pyplot as plt
def log_model_explanations(model, X_test, feature_names):
"""Log SHAP explanations for model interpretability"""
with mlflow.start_run():
# Calculate SHAP values
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# Create summary plot
plt.figure()
shap.summary_plot(shap_values, X_test, feature_names=feature_names, show=False)
plt.savefig("shap_summary.png", bbox_inches='tight')
mlflow.log_artifact("shap_summary.png")
# Log feature importance
feature_importance = dict(zip(feature_names, model.feature_importances_))
mlflow.log_params({f"importance_{k}": v for k, v in feature_importance.items()})A/B Testing
A/B测试
A/B Test Framework
A/B测试框架
python
import mlflow
import numpy as np
from datetime import datetime
class ABTestFramework:
def __init__(self, model_a_uri, model_b_uri, traffic_split=0.5):
self.model_a = mlflow.pyfunc.load_model(model_a_uri)
self.model_b = mlflow.pyfunc.load_model(model_b_uri)
self.traffic_split = traffic_split
mlflow.set_experiment("ab-testing")
def predict(self, data, user_id=None):
"""Route traffic between models and log results"""
# Determine which model to use
if user_id is None or hash(user_id) % 100 < self.traffic_split * 100:
model_name = "model_a"
prediction = self.model_a.predict(data)
else:
model_name = "model_b"
prediction = self.model_b.predict(data)
# Log the prediction
with mlflow.start_run(run_name=f"ab-test-{datetime.now().isoformat()}"):
mlflow.log_param("model_variant", model_name)
mlflow.log_param("user_id", user_id)
mlflow.log_metric("prediction", float(prediction[0]))
return prediction
def evaluate_test(self, results_a, results_b):
"""Evaluate A/B test results"""
with mlflow.start_run(run_name="ab-test-evaluation"):
# Calculate metrics for both variants
metrics_a = {
"mean_a": np.mean(results_a),
"std_a": np.std(results_a),
"count_a": len(results_a)
}
metrics_b = {
"mean_b": np.mean(results_b),
"std_b": np.std(results_b),
"count_b": len(results_b)
}
# Statistical test
from scipy import stats
t_stat, p_value = stats.ttest_ind(results_a, results_b)
mlflow.log_metrics({**metrics_a, **metrics_b})
mlflow.log_metric("t_statistic", t_stat)
mlflow.log_metric("p_value", p_value)
# Determine winner
if p_value < 0.05:
winner = "model_a" if np.mean(results_a) > np.mean(results_b) else "model_b"
mlflow.set_tag("winner", winner)
mlflow.set_tag("significant", "yes")
else:
mlflow.set_tag("significant", "no")python
import mlflow
import numpy as np
from datetime import datetime
class ABTestFramework:
def __init__(self, model_a_uri, model_b_uri, traffic_split=0.5):
self.model_a = mlflow.pyfunc.load_model(model_a_uri)
self.model_b = mlflow.pyfunc.load_model(model_b_uri)
self.traffic_split = traffic_split
mlflow.set_experiment("ab-testing")
def predict(self, data, user_id=None):
"""Route traffic between models and log results"""
# Determine which model to use
if user_id is None or hash(user_id) % 100 < self.traffic_split * 100:
model_name = "model_a"
prediction = self.model_a.predict(data)
else:
model_name = "model_b"
prediction = self.model_b.predict(data)
# Log the prediction
with mlflow.start_run(run_name=f"ab-test-{datetime.now().isoformat()}"):
mlflow.log_param("model_variant", model_name)
mlflow.log_param("user_id", user_id)
mlflow.log_metric("prediction", float(prediction[0]))
return prediction
def evaluate_test(self, results_a, results_b):
"""Evaluate A/B test results"""
with mlflow.start_run(run_name="ab-test-evaluation"):
# Calculate metrics for both variants
metrics_a = {
"mean_a": np.mean(results_a),
"std_a": np.std(results_a),
"count_a": len(results_a)
}
metrics_b = {
"mean_b": np.mean(results_b),
"std_b": np.std(results_b),
"count_b": len(results_b)
}
# Statistical test
from scipy import stats
t_stat, p_value = stats.ttest_ind(results_a, results_b)
mlflow.log_metrics({**metrics_a, **metrics_b})
mlflow.log_metric("t_statistic", t_stat)
mlflow.log_metric("p_value", p_value)
# Determine winner
if p_value < 0.05:
winner = "model_a" if np.mean(results_a) > np.mean(results_b) else "model_b"
mlflow.set_tag("winner", winner)
mlflow.set_tag("significant", "yes")
else:
mlflow.set_tag("significant", "no")Usage
Usage
ab_test = ABTestFramework(
model_a_uri="models:/CustomerChurnModel@champion",
model_b_uri="models:/CustomerChurnModel@challenger",
traffic_split=0.5
)
prediction = ab_test.predict(customer_data, user_id="user123")
undefinedab_test = ABTestFramework(
model_a_uri="models:/CustomerChurnModel@champion",
model_b_uri="models:/CustomerChurnModel@challenger",
traffic_split=0.5
)
prediction = ab_test.predict(customer_data, user_id="user123")
undefinedMulti-Armed Bandit Testing
多臂老虎机测试
python
import mlflow
import numpy as np
from scipy.stats import beta
class MultiArmedBandit:
def __init__(self, model_uris):
self.models = [mlflow.pyfunc.load_model(uri) for uri in model_uris]
self.successes = [1] * len(model_uris) # Prior
self.failures = [1] * len(model_uris) # Prior
mlflow.set_experiment("mab-testing")
def select_model(self):
"""Thompson sampling to select model"""
samples = [
np.random.beta(s, f)
for s, f in zip(self.successes, self.failures)
]
return np.argmax(samples)
def predict_and_update(self, data, actual_outcome=None):
"""Make prediction and update model performance"""
model_idx = self.select_model()
prediction = self.models[model_idx].predict(data)
with mlflow.start_run(run_name=f"mab-prediction"):
mlflow.log_param("selected_model", model_idx)
mlflow.log_metric("prediction", float(prediction[0]))
# Update based on outcome
if actual_outcome is not None:
if actual_outcome == prediction[0]:
self.successes[model_idx] += 1
else:
self.failures[model_idx] += 1
mlflow.log_metric("success_rate",
self.successes[model_idx] / (self.successes[model_idx] + self.failures[model_idx]))
return predictionpython
import mlflow
import numpy as np
from scipy.stats import beta
class MultiArmedBandit:
def __init__(self, model_uris):
self.models = [mlflow.pyfunc.load_model(uri) for uri in model_uris]
self.successes = [1] * len(model_uris) # Prior
self.failures = [1] * len(model_uris) # Prior
mlflow.set_experiment("mab-testing")
def select_model(self):
"""Thompson sampling to select model"""
samples = [
np.random.beta(s, f)
for s, f in zip(self.successes, self.failures)
]
return np.argmax(samples)
def predict_and_update(self, data, actual_outcome=None):
"""Make prediction and update model performance"""
model_idx = self.select_model()
prediction = self.models[model_idx].predict(data)
with mlflow.start_run(run_name=f"mab-prediction"):
mlflow.log_param("selected_model", model_idx)
mlflow.log_metric("prediction", float(prediction[0]))
# Update based on outcome
if actual_outcome is not None:
if actual_outcome == prediction[0]:
self.successes[model_idx] += 1
else:
self.failures[model_idx] += 1
mlflow.log_metric("success_rate",
self.successes[model_idx] / (self.successes[model_idx] + self.failures[model_idx]))
return predictionFeature Stores
特征存储
Feature Store Integration
特征存储集成
python
import mlflow
from datetime import datetime
import pandas as pd
class FeatureStore:
def __init__(self, storage_path):
self.storage_path = storage_path
mlflow.set_experiment("feature-store")
def create_feature_set(self, name, df, description=None):
"""Create and version a feature set"""
with mlflow.start_run(run_name=f"feature-set-{name}"):
# Save features
feature_path = f"{self.storage_path}/{name}_{datetime.now().isoformat()}.parquet"
df.to_parquet(feature_path)
# Log metadata
mlflow.log_param("feature_set_name", name)
mlflow.log_param("num_features", len(df.columns))
mlflow.log_param("num_samples", len(df))
mlflow.log_param("description", description or "")
# Log feature statistics
stats = df.describe().to_dict()
mlflow.log_dict(stats, "feature_stats.json")
# Log artifact
mlflow.log_artifact(feature_path)
return feature_path
def get_features(self, run_id):
"""Retrieve feature set by run ID"""
client = mlflow.MlflowClient()
run = client.get_run(run_id)
artifact_uri = run.info.artifact_uri
# Download and load features
local_path = mlflow.artifacts.download_artifacts(artifact_uri)
df = pd.read_parquet(local_path)
return dfpython
import mlflow
from datetime import datetime
import pandas as pd
class FeatureStore:
def __init__(self, storage_path):
self.storage_path = storage_path
mlflow.set_experiment("feature-store")
def create_feature_set(self, name, df, description=None):
"""Create and version a feature set"""
with mlflow.start_run(run_name=f"feature-set-{name}"):
# Save features
feature_path = f"{self.storage_path}/{name}_{datetime.now().isoformat()}.parquet"
df.to_parquet(feature_path)
# Log metadata
mlflow.log_param("feature_set_name", name)
mlflow.log_param("num_features", len(df.columns))
mlflow.log_param("num_samples", len(df))
mlflow.log_param("description", description or "")
# Log feature statistics
stats = df.describe().to_dict()
mlflow.log_dict(stats, "feature_stats.json")
# Log artifact
mlflow.log_artifact(feature_path)
return feature_path
def get_features(self, run_id):
"""Retrieve feature set by run ID"""
client = mlflow.MlflowClient()
run = client.get_run(run_id)
artifact_uri = run.info.artifact_uri
# Download and load features
local_path = mlflow.artifacts.download_artifacts(artifact_uri)
df = pd.read_parquet(local_path)
return dfUsage
Usage
store = FeatureStore("s3://my-bucket/features")
store = FeatureStore("s3://my-bucket/features")
Create features
Create features
features = pd.DataFrame({
'customer_id': range(1000),
'lifetime_value': np.random.rand(1000) * 1000,
'avg_purchase': np.random.rand(1000) * 100,
'days_since_last_purchase': np.random.randint(0, 365, 1000)
})
feature_path = store.create_feature_set(
name="customer_features",
df=features,
description="Customer behavioral features for churn prediction"
)
undefinedfeatures = pd.DataFrame({
'customer_id': range(1000),
'lifetime_value': np.random.rand(1000) * 1000,
'avg_purchase': np.random.rand(1000) * 100,
'days_since_last_purchase': np.random.randint(0, 365, 1000)
})
feature_path = store.create_feature_set(
name="customer_features",
df=features,
description="Customer behavioral features for churn prediction"
)
undefinedFeature Engineering Pipeline
特征工程流水线
python
import mlflow
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
def feature_engineering_pipeline(data, run_name="feature-engineering"):
"""Log feature engineering steps"""
with mlflow.start_run(run_name=run_name):
# Original features
mlflow.log_param("original_features", len(data.columns))
# Scaling
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
mlflow.sklearn.log_model(scaler, "scaler")
# Dimensionality reduction
pca = PCA(n_components=0.95)
transformed_data = pca.fit_transform(scaled_data)
mlflow.sklearn.log_model(pca, "pca")
mlflow.log_param("final_features", transformed_data.shape[1])
mlflow.log_metric("variance_explained", pca.explained_variance_ratio_.sum())
return transformed_datapython
import mlflow
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
def feature_engineering_pipeline(data, run_name="feature-engineering"):
"""Log feature engineering steps"""
with mlflow.start_run(run_name=run_name):
# Original features
mlflow.log_param("original_features", len(data.columns))
# Scaling
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
mlflow.sklearn.log_model(scaler, "scaler")
# Dimensionality reduction
pca = PCA(n_components=0.95)
transformed_data = pca.fit_transform(scaled_data)
mlflow.sklearn.log_model(pca, "pca")
mlflow.log_param("final_features", transformed_data.shape[1])
mlflow.log_metric("variance_explained", pca.explained_variance_ratio_.sum())
return transformed_dataUsage
Usage
transformed_features = feature_engineering_pipeline(raw_data)
undefinedtransformed_features = feature_engineering_pipeline(raw_data)
undefinedCI/CD for ML
机器学习CI/CD
Training Pipeline
训练流水线
python
import mlflow
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
def training_pipeline(data_path, model_params, validation_threshold=0.85):
"""Automated training pipeline with validation gates"""
mlflow.set_experiment("production-training-pipeline")
with mlflow.start_run(run_name="pipeline-run"):
# Load data
data = pd.read_csv(data_path)
X = data.drop('target', axis=1)
y = data['target']
# Log data version
mlflow.log_param("data_version", data_path.split('/')[-1])
mlflow.log_param("data_samples", len(data))
# Train model
model = RandomForestClassifier(**model_params)
model.fit(X, y)
# Cross-validation
cv_scores = cross_val_score(model, X, y, cv=5)
mean_cv_score = cv_scores.mean()
mlflow.log_params(model_params)
mlflow.log_metric("cv_score_mean", mean_cv_score)
mlflow.log_metric("cv_score_std", cv_scores.std())
# Validation gate
if mean_cv_score >= validation_threshold:
# Log model
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="ProductionModel"
)
mlflow.set_tag("status", "passed")
return True
else:
mlflow.set_tag("status", "failed")
mlflow.set_tag("failure_reason", "below_threshold")
return Falsepython
import mlflow
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
def training_pipeline(data_path, model_params, validation_threshold=0.85):
"""Automated training pipeline with validation gates"""
mlflow.set_experiment("production-training-pipeline")
with mlflow.start_run(run_name="pipeline-run"):
# Load data
data = pd.read_csv(data_path)
X = data.drop('target', axis=1)
y = data['target']
# Log data version
mlflow.log_param("data_version", data_path.split('/')[-1])
mlflow.log_param("data_samples", len(data))
# Train model
model = RandomForestClassifier(**model_params)
model.fit(X, y)
# Cross-validation
cv_scores = cross_val_score(model, X, y, cv=5)
mean_cv_score = cv_scores.mean()
mlflow.log_params(model_params)
mlflow.log_metric("cv_score_mean", mean_cv_score)
mlflow.log_metric("cv_score_std", cv_scores.std())
# Validation gate
if mean_cv_score >= validation_threshold:
# Log model
mlflow.sklearn.log_model(
sk_model=model,
name="model",
registered_model_name="ProductionModel"
)
mlflow.set_tag("status", "passed")
return True
else:
mlflow.set_tag("status", "failed")
mlflow.set_tag("failure_reason", "below_threshold")
return FalseUsage in CI/CD
Usage in CI/CD
success = training_pipeline(
data_path="data/training_data_v2.csv",
model_params={'n_estimators': 100, 'max_depth': 10},
validation_threshold=0.85
)
if not success:
raise ValueError("Model did not meet validation criteria")
undefinedsuccess = training_pipeline(
data_path="data/training_data_v2.csv",
model_params={'n_estimators': 100, 'max_depth': 10},
validation_threshold=0.85
)
if not success:
raise ValueError("Model did not meet validation criteria")
undefinedModel Promotion Pipeline
模型晋升流水线
python
from mlflow import MlflowClient
def promote_model_to_production(model_name, version, validation_results):
"""Promote model through stages with validation"""
client = MlflowClient()
# Validation checks
required_metrics = ['accuracy', 'precision', 'recall']
for metric in required_metrics:
if metric not in validation_results:
raise ValueError(f"Missing required metric: {metric}")
if validation_results[metric] < 0.8:
raise ValueError(f"{metric} below threshold: {validation_results[metric]}")
# Set tags
for metric, value in validation_results.items():
client.set_model_version_tag(
name=model_name,
version=version,
key=f"validation_{metric}",
value=str(value)
)
# Promote to production
client.set_registered_model_alias(
name=model_name,
alias="production",
version=version
)
# Tag with promotion metadata
client.set_model_version_tag(
name=model_name,
version=version,
key="promoted_at",
value=datetime.now().isoformat()
)
return Truepython
from mlflow import MlflowClient
def promote_model_to_production(model_name, version, validation_results):
"""Promote model through stages with validation"""
client = MlflowClient()
# Validation checks
required_metrics = ['accuracy', 'precision', 'recall']
for metric in required_metrics:
if metric not in validation_results:
raise ValueError(f"Missing required metric: {metric}")
if validation_results[metric] < 0.8:
raise ValueError(f"{metric} below threshold: {validation_results[metric]}")
# Set tags
for metric, value in validation_results.items():
client.set_model_version_tag(
name=model_name,
version=version,
key=f"validation_{metric}",
value=str(value)
)
# Promote to production
client.set_registered_model_alias(
name=model_name,
alias="production",
version=version
)
# Tag with promotion metadata
client.set_model_version_tag(
name=model_name,
version=version,
key="promoted_at",
value=datetime.now().isoformat()
)
return TrueUsage
Usage
validation_results = {
'accuracy': 0.92,
'precision': 0.89,
'recall': 0.91
}
promote_model_to_production(
model_name="FraudDetectionModel",
version="5",
validation_results=validation_results
)
undefinedvalidation_results = {
'accuracy': 0.92,
'precision': 0.89,
'recall': 0.91
}
promote_model_to_production(
model_name="FraudDetectionModel",
version="5",
validation_results=validation_results
)
undefinedAutomated Model Retraining
自动化模型重训练
python
import mlflow
import schedule
import time
class AutomatedRetrainer:
def __init__(self, model_name, data_source, schedule_interval="daily"):
self.model_name = model_name
self.data_source = data_source
self.schedule_interval = schedule_interval
mlflow.set_experiment(f"{model_name}-retraining")
def retrain(self):
"""Retrain model with latest data"""
with mlflow.start_run(run_name=f"retrain-{datetime.now().isoformat()}"):
# Load latest data
data = self.load_latest_data()
# Get current production model
client = MlflowClient()
current_model = client.get_model_version_by_alias(
self.model_name, "production"
)
# Load and evaluate current model
current_model_obj = mlflow.sklearn.load_model(
f"models:/{self.model_name}@production"
)
current_score = current_model_obj.score(X_test, y_test)
mlflow.log_metric("current_production_score", current_score)
# Train new model
new_model = self.train_model(data)
new_score = new_model.score(X_test, y_test)
mlflow.log_metric("new_model_score", new_score)
# Compare and promote if better
if new_score > current_score:
mlflow.sklearn.log_model(
sk_model=new_model,
name="model",
registered_model_name=self.model_name
)
mlflow.set_tag("status", "promoted")
else:
mlflow.set_tag("status", "not_promoted")
def start_scheduled_retraining(self):
"""Start scheduled retraining"""
if self.schedule_interval == "daily":
schedule.every().day.at("02:00").do(self.retrain)
elif self.schedule_interval == "weekly":
schedule.every().monday.at("02:00").do(self.retrain)
while True:
schedule.run_pending()
time.sleep(3600)python
import mlflow
import schedule
import time
class AutomatedRetrainer:
def __init__(self, model_name, data_source, schedule_interval="daily"):
self.model_name = model_name
self.data_source = data_source
self.schedule_interval = schedule_interval
mlflow.set_experiment(f"{model_name}-retraining")
def retrain(self):
"""Retrain model with latest data"""
with mlflow.start_run(run_name=f"retrain-{datetime.now().isoformat()}"):
# Load latest data
data = self.load_latest_data()
# Get current production model
client = MlflowClient()
current_model = client.get_model_version_by_alias(
self.model_name, "production"
)
# Load and evaluate current model
current_model_obj = mlflow.sklearn.load_model(
f"models:/{self.model_name}@production"
)
current_score = current_model_obj.score(X_test, y_test)
mlflow.log_metric("current_production_score", current_score)
# Train new model
new_model = self.train_model(data)
new_score = new_model.score(X_test, y_test)
mlflow.log_metric("new_model_score", new_score)
# Compare and promote if better
if new_score > current_score:
mlflow.sklearn.log_model(
sk_model=new_model,
name="model",
registered_model_name=self.model_name
)
mlflow.set_tag("status", "promoted")
else:
mlflow.set_tag("status", "not_promoted")
def start_scheduled_retraining(self):
"""Start scheduled retraining"""
if self.schedule_interval == "daily":
schedule.every().day.at("02:00").do(self.retrain)
elif self.schedule_interval == "weekly":
schedule.every().monday.at("02:00").do(self.retrain)
while True:
schedule.run_pending()
time.sleep(3600)Usage
Usage
retrainer = AutomatedRetrainer(
model_name="CustomerChurnModel",
data_source="s3://bucket/data",
schedule_interval="daily"
)
undefinedretrainer = AutomatedRetrainer(
model_name="CustomerChurnModel",
data_source="s3://bucket/data",
schedule_interval="daily"
)
undefinedProduction Best Practices
生产环境最佳实践
Model Signatures
模型签名
python
from mlflow.models import infer_signature, ModelSignature
from mlflow.types import Schema, ColSpec
import mlflow.sklearn
import numpy as nppython
from mlflow.models import infer_signature, ModelSignature
from mlflow.types import Schema, ColSpec
import mlflow.sklearn
import numpy as npMethod 1: Infer signature from data
Method 1: Infer signature from data
signature = infer_signature(X_train, model.predict(X_train))
signature = infer_signature(X_train, model.predict(X_train))
Method 2: Define explicit signature
Method 2: Define explicit signature
input_schema = Schema([
ColSpec("double", "age"),
ColSpec("double", "income"),
ColSpec("string", "customer_segment")
])
output_schema = Schema([ColSpec("double")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
input_schema = Schema([
ColSpec("double", "age"),
ColSpec("double", "income"),
ColSpec("string", "customer_segment")
])
output_schema = Schema([ColSpec("double")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
Log model with signature
Log model with signature
mlflow.sklearn.log_model(
sk_model=model,
name="model",
signature=signature,
input_example=X_train[:5]
)
undefinedmlflow.sklearn.log_model(
sk_model=model,
name="model",
signature=signature,
input_example=X_train[:5]
)
undefinedModel Validation Framework
模型验证框架
python
import mlflow
from sklearn.metrics import classification_report
import json
class ModelValidator:
def __init__(self, thresholds):
self.thresholds = thresholds
def validate(self, model, X_test, y_test):
"""Comprehensive model validation"""
results = {}
with mlflow.start_run(run_name="model-validation"):
# Performance metrics
predictions = model.predict(X_test)
report = classification_report(y_test, predictions, output_dict=True)
# Check thresholds
passed = True
for metric, threshold in self.thresholds.items():
value = report['weighted avg'][metric]
results[metric] = value
mlflow.log_metric(metric, value)
if value < threshold:
passed = False
mlflow.set_tag(f"{metric}_failed", "true")
# Detailed report
with open("validation_report.json", "w") as f:
json.dump(report, f, indent=2)
mlflow.log_artifact("validation_report.json")
mlflow.set_tag("validation_passed", str(passed))
return passed, resultspython
import mlflow
from sklearn.metrics import classification_report
import json
class ModelValidator:
def __init__(self, thresholds):
self.thresholds = thresholds
def validate(self, model, X_test, y_test):
"""Comprehensive model validation"""
results = {}
with mlflow.start_run(run_name="model-validation"):
# Performance metrics
predictions = model.predict(X_test)
report = classification_report(y_test, predictions, output_dict=True)
# Check thresholds
passed = True
for metric, threshold in self.thresholds.items():
value = report['weighted avg'][metric]
results[metric] = value
mlflow.log_metric(metric, value)
if value < threshold:
passed = False
mlflow.set_tag(f"{metric}_failed", "true")
# Detailed report
with open("validation_report.json", "w") as f:
json.dump(report, f, indent=2)
mlflow.log_artifact("validation_report.json")
mlflow.set_tag("validation_passed", str(passed))
return passed, resultsUsage
Usage
validator = ModelValidator(thresholds={
'precision': 0.85,
'recall': 0.80,
'f1-score': 0.82
})
passed, results = validator.validate(model, X_test, y_test)
undefinedvalidator = ModelValidator(thresholds={
'precision': 0.85,
'recall': 0.80,
'f1-score': 0.82
})
passed, results = validator.validate(model, X_test, y_test)
undefinedError Handling and Logging
错误处理与日志记录
python
import mlflow
import logging
from functools import wraps
def mlflow_error_handler(func):
"""Decorator for MLflow error handling"""
@wraps(func)
def wrapper(*args, **kwargs):
with mlflow.start_run(run_name=f"{func.__name__}"):
try:
result = func(*args, **kwargs)
mlflow.set_tag("status", "success")
return result
except Exception as e:
# Log error
mlflow.set_tag("status", "failed")
mlflow.set_tag("error_type", type(e).__name__)
mlflow.set_tag("error_message", str(e))
# Log traceback
import traceback
tb = traceback.format_exc()
with open("error_traceback.txt", "w") as f:
f.write(tb)
mlflow.log_artifact("error_traceback.txt")
logging.error(f"Error in {func.__name__}: {str(e)}")
raise
return wrapper
@mlflow_error_handler
def train_model_with_error_handling(data):
# Training code
model = RandomForestClassifier()
model.fit(X, y)
return modelpython
import mlflow
import logging
from functools import wraps
def mlflow_error_handler(func):
"""Decorator for MLflow error handling"""
@wraps(func)
def wrapper(*args, **kwargs):
with mlflow.start_run(run_name=f"{func.__name__}"):
try:
result = func(*args, **kwargs)
mlflow.set_tag("status", "success")
return result
except Exception as e:
# Log error
mlflow.set_tag("status", "failed")
mlflow.set_tag("error_type", type(e).__name__)
mlflow.set_tag("error_message", str(e))
# Log traceback
import traceback
tb = traceback.format_exc()
with open("error_traceback.txt", "w") as f:
f.write(tb)
mlflow.log_artifact("error_traceback.txt")
logging.error(f"Error in {func.__name__}: {str(e)}")
raise
return wrapper
@mlflow_error_handler
def train_model_with_error_handling(data):
# Training code
model = RandomForestClassifier()
model.fit(X, y)
return modelModel Performance Baseline
模型性能基准
python
import mlflow
from sklearn.dummy import DummyClassifier
def establish_baseline(X_train, y_train, X_test, y_test):
"""Establish baseline model performance"""
mlflow.set_experiment("baseline-models")
strategies = ['most_frequent', 'stratified', 'uniform']
for strategy in strategies:
with mlflow.start_run(run_name=f"baseline-{strategy}"):
baseline = DummyClassifier(strategy=strategy)
baseline.fit(X_train, y_train)
score = baseline.score(X_test, y_test)
mlflow.log_param("strategy", strategy)
mlflow.log_metric("accuracy", score)
mlflow.sklearn.log_model(
sk_model=baseline,
name="baseline_model",
registered_model_name=f"Baseline-{strategy}"
)python
import mlflow
from sklearn.dummy import DummyClassifier
def establish_baseline(X_train, y_train, X_test, y_test):
"""Establish baseline model performance"""
mlflow.set_experiment("baseline-models")
strategies = ['most_frequent', 'stratified', 'uniform']
for strategy in strategies:
with mlflow.start_run(run_name=f"baseline-{strategy}"):
baseline = DummyClassifier(strategy=strategy)
baseline.fit(X_train, y_train)
score = baseline.score(X_test, y_test)
mlflow.log_param("strategy", strategy)
mlflow.log_metric("accuracy", score)
mlflow.sklearn.log_model(
sk_model=baseline,
name="baseline_model",
registered_model_name=f"Baseline-{strategy}"
)Usage
Usage
establish_baseline(X_train, y_train, X_test, y_test)
undefinedestablish_baseline(X_train, y_train, X_test, y_test)
undefinedSummary
总结
This comprehensive guide covers production-grade MLOps workflows using MLflow:
- Experiment Tracking: Log parameters, metrics, and artifacts systematically
- Model Registry: Centralized model versioning and lifecycle management
- Deployment: Multiple deployment patterns for various platforms
- Monitoring: Track model performance and data drift in production
- A/B Testing: Compare model variants in production
- Feature Stores: Version and manage feature engineering
- CI/CD: Automated training, validation, and promotion pipelines
- Best Practices: Signatures, validation, error handling, and baselines
These patterns enable teams to build robust, scalable ML systems from experimentation through production deployment and monitoring.
本全面指南涵盖了基于MLflow的生产级MLOps工作流:
- 实验跟踪:系统化记录参数、指标及工件
- 模型注册:中心化模型版本控制与生命周期管理
- 部署:适用于多平台的多种部署模式
- 监控:跟踪生产环境中的模型性能与数据漂移
- A/B测试:在生产环境中对比模型变体
- 特征存储:版本化管理特征工程成果
- CI/CD:自动化训练、验证与晋升流水线
- 最佳实践:签名、验证、错误处理及基准设置
这些模式帮助团队构建从实验到生产部署及监控的稳健、可扩展机器学习系统。