mlops-workflows

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

MLOps Workflows with MLflow

基于MLflow的MLOps工作流

A comprehensive guide to production-grade MLOps workflows covering the complete machine learning lifecycle from experimentation to production deployment and monitoring.
本指南全面介绍了生产级MLOps工作流,涵盖从实验到生产部署及监控的完整机器学习生命周期。

Table of Contents

目录

MLflow Components Overview

MLflow组件概览

MLflow consists of four primary components for managing the ML lifecycle:
MLflow包含四个用于管理机器学习生命周期的核心组件:

1. MLflow Tracking

1. MLflow Tracking

Track experiments, parameters, metrics, and artifacts during model development.
python
import mlflow
在模型开发过程中跟踪实验、参数、指标及工件。
python
import mlflow

Set tracking URI

Set tracking URI

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_tracking_uri("http://localhost:5000")

Create or set experiment

Create or set experiment

mlflow.set_experiment("production-models")
mlflow.set_experiment("production-models")

Start a run

Start a run

with mlflow.start_run(run_name="baseline-model"): # Log parameters mlflow.log_param("learning_rate", 0.01) mlflow.log_param("batch_size", 32)
# Log metrics
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("loss", 0.05)

# Log artifacts
mlflow.log_artifact("model_plot.png")
undefined
with mlflow.start_run(run_name="baseline-model"): # Log parameters mlflow.log_param("learning_rate", 0.01) mlflow.log_param("batch_size", 32)
# Log metrics
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("loss", 0.05)

# Log artifacts
mlflow.log_artifact("model_plot.png")
undefined

2. MLflow Projects

2. MLflow Projects

Package ML code in a reusable, reproducible format.
yaml
undefined
以可复用、可复现的格式打包机器学习代码。
yaml
undefined

MLproject file

MLproject file

name: my-ml-project conda_env: conda.yaml
entry_points: main: parameters: learning_rate: {type: float, default: 0.01} epochs: {type: int, default: 100} command: "python train.py --lr {learning_rate} --epochs {epochs}"
evaluate: parameters: model_uri: {type: string} command: "python evaluate.py --model-uri {model_uri}"
undefined
name: my-ml-project conda_env: conda.yaml
entry_points: main: parameters: learning_rate: {type: float, default: 0.01} epochs: {type: int, default: 100} command: "python train.py --lr {learning_rate} --epochs {epochs}"
evaluate: parameters: model_uri: {type: string} command: "python evaluate.py --model-uri {model_uri}"
undefined

3. MLflow Models

3. MLflow Models

Package models in a standard format for deployment across platforms.
python
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
以标准格式打包模型,实现跨平台部署。
python
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

Train model

Train model

model = RandomForestClassifier() model.fit(X_train, y_train)
model = RandomForestClassifier() model.fit(X_train, y_train)

Log model with signature

Log model with signature

from mlflow.models import infer_signature signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model( sk_model=model, name="random-forest-model", signature=signature, input_example=X_train[:5], registered_model_name="ProductionClassifier" )
undefined
from mlflow.models import infer_signature signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model( sk_model=model, name="random-forest-model", signature=signature, input_example=X_train[:5], registered_model_name="ProductionClassifier" )
undefined

4. MLflow Registry

4. MLflow Registry

Centralized model store for managing model lifecycle and versioning.
python
from mlflow import MlflowClient

client = MlflowClient()
用于管理模型生命周期和版本控制的中心化模型仓库。
python
from mlflow import MlflowClient

client = MlflowClient()

Register model

Register model

model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerChurnModel" )
model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerChurnModel" )

Set model alias for deployment

Set model alias for deployment

client.set_registered_model_alias( name="CustomerChurnModel", alias="production", version=registered_model.version )
undefined
client.set_registered_model_alias( name="CustomerChurnModel", alias="production", version=registered_model.version )
undefined

Experiment Tracking

实验跟踪

Basic Experiment Tracking

基础实验跟踪

python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split

Configure MLflow

Configure MLflow

mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("house-price-prediction")
mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("house-price-prediction")

Load and prepare data

Load and prepare data

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

Training with MLflow tracking

Training with MLflow tracking

with mlflow.start_run(run_name="rf-baseline"): # Define parameters params = { "n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "random_state": 42 }
# Train model
model = RandomForestRegressor(**params)
model.fit(X_train, y_train)

# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)

# Log everything
mlflow.log_params(params)
mlflow.log_metrics({
    "mse": mse,
    "r2": r2,
    "rmse": mse ** 0.5
})

# Log model
mlflow.sklearn.log_model(
    sk_model=model,
    name="model",
    registered_model_name="HousePricePredictor"
)
undefined
with mlflow.start_run(run_name="rf-baseline"): # Define parameters params = { "n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "random_state": 42 }
# Train model
model = RandomForestRegressor(**params)
model.fit(X_train, y_train)

# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)

# Log everything
mlflow.log_params(params)
mlflow.log_metrics({
    "mse": mse,
    "r2": r2,
    "rmse": mse ** 0.5
})

# Log model
mlflow.sklearn.log_model(
    sk_model=model,
    name="model",
    registered_model_name="HousePricePredictor"
)
undefined

Autologging

自动日志记录

MLflow provides automatic logging for popular frameworks:
python
import mlflow
from sklearn.ensemble import RandomForestClassifier
MLflow为主流框架提供自动日志记录功能:
python
import mlflow
from sklearn.ensemble import RandomForestClassifier

Enable autologging for scikit-learn

Enable autologging for scikit-learn

mlflow.sklearn.autolog()
mlflow.sklearn.autolog()

Your training code - everything is logged automatically

Your training code - everything is logged automatically

with mlflow.start_run(): model = RandomForestClassifier(n_estimators=100, max_depth=5) model.fit(X_train, y_train) predictions = model.predict(X_test)
undefined
with mlflow.start_run(): model = RandomForestClassifier(n_estimators=100, max_depth=5) model.fit(X_train, y_train) predictions = model.predict(X_test)
undefined

Nested Runs for Hyperparameter Tuning

用于超参数调优的嵌套运行

python
import mlflow
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier

mlflow.set_experiment("hyperparameter-tuning")
python
import mlflow
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier

mlflow.set_experiment("hyperparameter-tuning")

Parent run for the entire tuning process

Parent run for the entire tuning process

with mlflow.start_run(run_name="grid-search-parent"): param_grid = { 'learning_rate': [0.01, 0.1, 0.3], 'n_estimators': [50, 100, 200], 'max_depth': [3, 5, 7] }
# Log parent parameters
mlflow.log_param("tuning_method", "grid_search")
mlflow.log_param("cv_folds", 5)

best_score = 0
best_params = None

# Nested runs for each parameter combination
for lr in param_grid['learning_rate']:
    for n_est in param_grid['n_estimators']:
        for depth in param_grid['max_depth']:
            with mlflow.start_run(nested=True, run_name=f"lr{lr}_n{n_est}_d{depth}"):
                params = {
                    'learning_rate': lr,
                    'n_estimators': n_est,
                    'max_depth': depth
                }

                model = GradientBoostingClassifier(**params)
                model.fit(X_train, y_train)
                score = model.score(X_test, y_test)

                mlflow.log_params(params)
                mlflow.log_metric("accuracy", score)

                if score > best_score:
                    best_score = score
                    best_params = params

# Log best results in parent run
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_accuracy", best_score)
undefined
with mlflow.start_run(run_name="grid-search-parent"): param_grid = { 'learning_rate': [0.01, 0.1, 0.3], 'n_estimators': [50, 100, 200], 'max_depth': [3, 5, 7] }
# Log parent parameters
mlflow.log_param("tuning_method", "grid_search")
mlflow.log_param("cv_folds", 5)

best_score = 0
best_params = None

# Nested runs for each parameter combination
for lr in param_grid['learning_rate']:
    for n_est in param_grid['n_estimators']:
        for depth in param_grid['max_depth']:
            with mlflow.start_run(nested=True, run_name=f"lr{lr}_n{n_est}_d{depth}"):
                params = {
                    'learning_rate': lr,
                    'n_estimators': n_est,
                    'max_depth': depth
                }

                model = GradientBoostingClassifier(**params)
                model.fit(X_train, y_train)
                score = model.score(X_test, y_test)

                mlflow.log_params(params)
                mlflow.log_metric("accuracy", score)

                if score > best_score:
                    best_score = score
                    best_params = params

# Log best results in parent run
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_accuracy", best_score)
undefined

Tracking Multiple Metrics Over Time

随时间跟踪多指标

python
import mlflow
import numpy as np

with mlflow.start_run():
    # Log metrics at different steps (epochs)
    for epoch in range(100):
        train_loss = np.random.random() * (1 - epoch/100)
        val_loss = np.random.random() * (1 - epoch/100) + 0.1

        mlflow.log_metric("train_loss", train_loss, step=epoch)
        mlflow.log_metric("val_loss", val_loss, step=epoch)
        mlflow.log_metric("learning_rate", 0.01 * (0.95 ** epoch), step=epoch)
python
import mlflow
import numpy as np

with mlflow.start_run():
    # Log metrics at different steps (epochs)
    for epoch in range(100):
        train_loss = np.random.random() * (1 - epoch/100)
        val_loss = np.random.random() * (1 - epoch/100) + 0.1

        mlflow.log_metric("train_loss", train_loss, step=epoch)
        mlflow.log_metric("val_loss", val_loss, step=epoch)
        mlflow.log_metric("learning_rate", 0.01 * (0.95 ** epoch), step=epoch)

Logging Artifacts

日志记录工件

python
import mlflow
import matplotlib.pyplot as plt
import pandas as pd

with mlflow.start_run():
    # Log plot
    plt.figure(figsize=(10, 6))
    plt.plot(history['loss'], label='Training Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.legend()
    plt.savefig("loss_curve.png")
    mlflow.log_artifact("loss_curve.png")

    # Log dataframe as CSV
    feature_importance = pd.DataFrame({
        'feature': feature_names,
        'importance': model.feature_importances_
    })
    feature_importance.to_csv("feature_importance.csv", index=False)
    mlflow.log_artifact("feature_importance.csv")

    # Log entire directory
    mlflow.log_artifacts("output_dir/", artifact_path="outputs")
python
import mlflow
import matplotlib.pyplot as plt
import pandas as pd

with mlflow.start_run():
    # Log plot
    plt.figure(figsize=(10, 6))
    plt.plot(history['loss'], label='Training Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.legend()
    plt.savefig("loss_curve.png")
    mlflow.log_artifact("loss_curve.png")

    # Log dataframe as CSV
    feature_importance = pd.DataFrame({
        'feature': feature_names,
        'importance': model.feature_importances_
    })
    feature_importance.to_csv("feature_importance.csv", index=False)
    mlflow.log_artifact("feature_importance.csv")

    # Log entire directory
    mlflow.log_artifacts("output_dir/", artifact_path="outputs")

Model Registry

模型注册

Registering Models

注册模型

python
from mlflow import MlflowClient
import mlflow.sklearn

client = MlflowClient()
python
from mlflow import MlflowClient
import mlflow.sklearn

client = MlflowClient()

Method 1: Register during model logging

Method 1: Register during model logging

with mlflow.start_run(): mlflow.sklearn.log_model( sk_model=model, name="model", registered_model_name="CustomerSegmentationModel" )
with mlflow.start_run(): mlflow.sklearn.log_model( sk_model=model, name="model", registered_model_name="CustomerSegmentationModel" )

Method 2: Register an existing model

Method 2: Register an existing model

run_id = "abc123" model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerSegmentationModel" )
undefined
run_id = "abc123" model_uri = f"runs:/{run_id}/model" registered_model = mlflow.register_model( model_uri=model_uri, name="CustomerSegmentationModel" )
undefined

Model Versioning and Aliases

模型版本控制与别名

python
from mlflow import MlflowClient

client = MlflowClient()
python
from mlflow import MlflowClient

client = MlflowClient()

Create registered model

Create registered model

client.create_registered_model( name="FraudDetectionModel", description="ML model for detecting fraudulent transactions" )
client.create_registered_model( name="FraudDetectionModel", description="ML model for detecting fraudulent transactions" )

Register version 1

Register version 1

model_uri_v1 = "runs:/run1/model" mv1 = client.create_model_version( name="FraudDetectionModel", source=model_uri_v1, run_id="run1" )
model_uri_v1 = "runs:/run1/model" mv1 = client.create_model_version( name="FraudDetectionModel", source=model_uri_v1, run_id="run1" )

Set aliases for deployment management

Set aliases for deployment management

client.set_registered_model_alias( name="FraudDetectionModel", alias="champion", # Production model version="1" )
client.set_registered_model_alias( name="FraudDetectionModel", alias="challenger", # A/B testing model version="2" )
client.set_registered_model_alias( name="FraudDetectionModel", alias="champion", # Production model version="1" )
client.set_registered_model_alias( name="FraudDetectionModel", alias="challenger", # A/B testing model version="2" )

Load model by alias

Load model by alias

champion_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@champion") challenger_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@challenger")
undefined
champion_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@champion") challenger_model = mlflow.sklearn.load_model("models:/FraudDetectionModel@challenger")
undefined

Model Lifecycle Management

模型生命周期管理

python
from mlflow import MlflowClient
from mlflow.entities import LoggedModelStatus

client = MlflowClient()
python
from mlflow import MlflowClient
from mlflow.entities import LoggedModelStatus

client = MlflowClient()

Initialize model in PENDING state

Initialize model in PENDING state

model = mlflow.initialize_logged_model( name="neural_network_classifier", model_type="neural_network", tags={"architecture": "resnet", "dataset": "imagenet"} )
try: # Training and validation train_model() validate_model()
# Log model artifacts
mlflow.pytorch.log_model(
    pytorch_model=model_instance,
    name="model",
    model_id=model.model_id
)

# Mark as ready
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.READY)
except Exception as e: # Mark as failed mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.FAILED) raise
undefined
model = mlflow.initialize_logged_model( name="neural_network_classifier", model_type="neural_network", tags={"architecture": "resnet", "dataset": "imagenet"} )
try: # Training and validation train_model() validate_model()
# Log model artifacts
mlflow.pytorch.log_model(
    pytorch_model=model_instance,
    name="model",
    model_id=model.model_id
)

# Mark as ready
mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.READY)
except Exception as e: # Mark as failed mlflow.finalize_logged_model(model.model_id, LoggedModelStatus.FAILED) raise
undefined

Model Metadata and Tags

模型元数据与标签

python
from mlflow import MlflowClient

client = MlflowClient()
python
from mlflow import MlflowClient

client = MlflowClient()

Set registered model tags

Set registered model tags

client.set_registered_model_tag( name="RecommendationModel", key="task", value="collaborative_filtering" )
client.set_registered_model_tag( name="RecommendationModel", key="business_unit", value="ecommerce" )
client.set_registered_model_tag( name="RecommendationModel", key="task", value="collaborative_filtering" )
client.set_registered_model_tag( name="RecommendationModel", key="business_unit", value="ecommerce" )

Set model version tags

Set model version tags

client.set_model_version_tag( name="RecommendationModel", version="3", key="validation_status", value="approved" )
client.set_model_version_tag( name="RecommendationModel", version="3", key="approval_date", value="2024-01-15" )
client.set_model_version_tag( name="RecommendationModel", version="3", key="validation_status", value="approved" )
client.set_model_version_tag( name="RecommendationModel", version="3", key="approval_date", value="2024-01-15" )

Update model description

Update model description

client.update_registered_model( name="RecommendationModel", description="Collaborative filtering model for product recommendations. Trained on user-item interaction data." )
undefined
client.update_registered_model( name="RecommendationModel", description="Collaborative filtering model for product recommendations. Trained on user-item interaction data." )
undefined

Searching and Filtering Models

搜索与过滤模型

python
from mlflow import MlflowClient

client = MlflowClient()
python
from mlflow import MlflowClient

client = MlflowClient()

Search registered models

Search registered models

models = client.search_registered_models( filter_string="name LIKE 'Production%'", max_results=10 )
models = client.search_registered_models( filter_string="name LIKE 'Production%'", max_results=10 )

Search model versions

Search model versions

versions = client.search_model_versions( filter_string="name='CustomerChurnModel' AND tags.validation_status='approved'" )
versions = client.search_model_versions( filter_string="name='CustomerChurnModel' AND tags.validation_status='approved'" )

Get specific model version

Get specific model version

model_version = client.get_model_version( name="CustomerChurnModel", version="5" )
model_version = client.get_model_version( name="CustomerChurnModel", version="5" )

Get model by alias

Get model by alias

champion = client.get_model_version_by_alias( name="CustomerChurnModel", alias="champion" )
undefined
champion = client.get_model_version_by_alias( name="CustomerChurnModel", alias="champion" )
undefined

Deployment Patterns

部署模式

Local Model Serving

本地模型服务

python
import mlflow.pyfunc
python
import mlflow.pyfunc

Load model

Load model

model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")

Make predictions

Make predictions

predictions = model.predict(data)
undefined
predictions = model.predict(data)
undefined

REST API Deployment

REST API部署

bash
undefined
bash
undefined

Serve model as REST API

Serve model as REST API

mlflow models serve
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4

```python
mlflow models serve
--model-uri models:/CustomerChurnModel@production
--host 0.0.0.0
--port 5001
--workers 4

```python

Client code to call the REST API

Client code to call the REST API

import requests import json
url = "http://localhost:5001/invocations" headers = {"Content-Type": "application/json"}
data = { "dataframe_split": { "columns": ["feature1", "feature2", "feature3"], "data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]] } }
response = requests.post(url, headers=headers, data=json.dumps(data)) predictions = response.json()
undefined
import requests import json
url = "http://localhost:5001/invocations" headers = {"Content-Type": "application/json"}
data = { "dataframe_split": { "columns": ["feature1", "feature2", "feature3"], "data": [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]] } }
response = requests.post(url, headers=headers, data=json.dumps(data)) predictions = response.json()
undefined

Docker Deployment

Docker部署

bash
undefined
bash
undefined

Build Docker image

Build Docker image

mlflow models build-docker
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model
mlflow models build-docker
--model-uri models:/CustomerChurnModel@production
--name customer-churn-model

Run container

Run container

docker run -p 8080:8080 customer-churn-model
undefined
docker run -p 8080:8080 customer-churn-model
undefined

AWS SageMaker Deployment

AWS SageMaker部署

python
import mlflow.sagemaker
python
import mlflow.sagemaker

Deploy to SageMaker

Deploy to SageMaker

mlflow.sagemaker.deploy( app_name="customer-churn-predictor", model_uri="models:/CustomerChurnModel@production", region_name="us-east-1", mode="create", execution_role_arn="arn:aws:iam::123456789:role/SageMakerRole", instance_type="ml.m5.xlarge", instance_count=2 )
undefined
mlflow.sagemaker.deploy( app_name="customer-churn-predictor", model_uri="models:/CustomerChurnModel@production", region_name="us-east-1", mode="create", execution_role_arn="arn:aws:iam::123456789:role/SageMakerRole", instance_type="ml.m5.xlarge", instance_count=2 )
undefined

Azure ML Deployment

Azure ML部署

python
import mlflow.azureml
from azureml.core import Workspace
from azureml.core.webservice import AciWebservice
python
import mlflow.azureml
from azureml.core import Workspace
from azureml.core.webservice import AciWebservice

Configure workspace

Configure workspace

ws = Workspace.from_config()
ws = Workspace.from_config()

Deploy to Azure Container Instance

Deploy to Azure Container Instance

aci_config = AciWebservice.deploy_configuration( cpu_cores=2, memory_gb=4, tags={"model": "churn-predictor"}, description="Customer churn prediction model" )
mlflow.azureml.deploy( model_uri="models:/CustomerChurnModel@production", workspace=ws, deployment_config=aci_config, service_name="churn-predictor-service" )
undefined
aci_config = AciWebservice.deploy_configuration( cpu_cores=2, memory_gb=4, tags={"model": "churn-predictor"}, description="Customer churn prediction model" )
mlflow.azureml.deploy( model_uri="models:/CustomerChurnModel@production", workspace=ws, deployment_config=aci_config, service_name="churn-predictor-service" )
undefined

GCP Vertex AI Deployment

GCP Vertex AI部署

python
from google.cloud import aiplatform
import mlflow
python
from google.cloud import aiplatform
import mlflow

Initialize Vertex AI

Initialize Vertex AI

aiplatform.init(project="my-project", location="us-central1")
aiplatform.init(project="my-project", location="us-central1")

Deploy to Vertex AI

Deploy to Vertex AI

model = mlflow.register_model( model_uri="runs:/run-id/model", name="CustomerChurnModel" )
model = mlflow.register_model( model_uri="runs:/run-id/model", name="CustomerChurnModel" )

Create Vertex AI endpoint

Create Vertex AI endpoint

endpoint = aiplatform.Endpoint.create(display_name="churn-prediction-endpoint")
endpoint = aiplatform.Endpoint.create(display_name="churn-prediction-endpoint")

Deploy model

Deploy model

endpoint.deploy( model=model, deployed_model_display_name="churn-v1", machine_type="n1-standard-4", min_replica_count=1, max_replica_count=5 )
undefined
endpoint.deploy( model=model, deployed_model_display_name="churn-v1", machine_type="n1-standard-4", min_replica_count=1, max_replica_count=5 )
undefined

Batch Inference

批量推理

python
import mlflow
import pandas as pd
python
import mlflow
import pandas as pd

Load model

Load model

model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")
model = mlflow.pyfunc.load_model("models:/CustomerChurnModel@production")

Load batch data

Load batch data

batch_data = pd.read_csv("customer_batch.csv")
batch_data = pd.read_csv("customer_batch.csv")

Process in chunks

Process in chunks

chunk_size = 1000 predictions = []
for i in range(0, len(batch_data), chunk_size): chunk = batch_data[i:i+chunk_size] chunk_predictions = model.predict(chunk) predictions.extend(chunk_predictions)
chunk_size = 1000 predictions = []
for i in range(0, len(batch_data), chunk_size): chunk = batch_data[i:i+chunk_size] chunk_predictions = model.predict(chunk) predictions.extend(chunk_predictions)

Save results

Save results

results = pd.DataFrame({ 'customer_id': batch_data['customer_id'], 'churn_probability': predictions }) results.to_csv("churn_predictions.csv", index=False)
undefined
results = pd.DataFrame({ 'customer_id': batch_data['customer_id'], 'churn_probability': predictions }) results.to_csv("churn_predictions.csv", index=False)
undefined

Monitoring and Observability

监控与可观测性

Model Performance Monitoring

模型性能监控

python
import mlflow
from datetime import datetime
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score

class ModelMonitor:
    def __init__(self, model_name, tracking_uri):
        self.model_name = model_name
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(f"{model_name}-monitoring")

    def log_prediction_metrics(self, y_true, y_pred, timestamp=None):
        """Log prediction metrics for monitoring"""
        if timestamp is None:
            timestamp = datetime.now()

        with mlflow.start_run(run_name=f"monitoring-{timestamp}"):
            # Calculate metrics
            metrics = {
                "accuracy": accuracy_score(y_true, y_pred),
                "precision": precision_score(y_true, y_pred, average='weighted'),
                "recall": recall_score(y_true, y_pred, average='weighted')
            }

            # Log metrics
            mlflow.log_metrics(metrics)
            mlflow.log_param("timestamp", timestamp.isoformat())
            mlflow.log_param("num_predictions", len(y_pred))

            # Check for drift
            if metrics["accuracy"] < 0.85:
                mlflow.set_tag("alert", "performance_degradation")

    def log_data_drift(self, reference_data, current_data):
        """Monitor for data drift"""
        with mlflow.start_run(run_name="data-drift-check"):
            # Calculate distribution statistics
            for col in reference_data.columns:
                ref_mean = reference_data[col].mean()
                curr_mean = current_data[col].mean()
                drift_percent = abs((curr_mean - ref_mean) / ref_mean) * 100

                mlflow.log_metric(f"{col}_drift_percent", drift_percent)

                if drift_percent > 20:
                    mlflow.set_tag(f"{col}_drift_alert", "high")
python
import mlflow
from datetime import datetime
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score

class ModelMonitor:
    def __init__(self, model_name, tracking_uri):
        self.model_name = model_name
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(f"{model_name}-monitoring")

    def log_prediction_metrics(self, y_true, y_pred, timestamp=None):
        """Log prediction metrics for monitoring"""
        if timestamp is None:
            timestamp = datetime.now()

        with mlflow.start_run(run_name=f"monitoring-{timestamp}"):
            # Calculate metrics
            metrics = {
                "accuracy": accuracy_score(y_true, y_pred),
                "precision": precision_score(y_true, y_pred, average='weighted'),
                "recall": recall_score(y_true, y_pred, average='weighted')
            }

            # Log metrics
            mlflow.log_metrics(metrics)
            mlflow.log_param("timestamp", timestamp.isoformat())
            mlflow.log_param("num_predictions", len(y_pred))

            # Check for drift
            if metrics["accuracy"] < 0.85:
                mlflow.set_tag("alert", "performance_degradation")

    def log_data_drift(self, reference_data, current_data):
        """Monitor for data drift"""
        with mlflow.start_run(run_name="data-drift-check"):
            # Calculate distribution statistics
            for col in reference_data.columns:
                ref_mean = reference_data[col].mean()
                curr_mean = current_data[col].mean()
                drift_percent = abs((curr_mean - ref_mean) / ref_mean) * 100

                mlflow.log_metric(f"{col}_drift_percent", drift_percent)

                if drift_percent > 20:
                    mlflow.set_tag(f"{col}_drift_alert", "high")

Usage

Usage

monitor = ModelMonitor("CustomerChurnModel", "http://localhost:5000") monitor.log_prediction_metrics(y_true, y_pred)
undefined
monitor = ModelMonitor("CustomerChurnModel", "http://localhost:5000") monitor.log_prediction_metrics(y_true, y_pred)
undefined

Prediction Logging

预测日志记录

python
import mlflow
from datetime import datetime
import json

def log_predictions(model_name, inputs, predictions, metadata=None):
    """Log predictions for auditing and monitoring"""
    mlflow.set_experiment(f"{model_name}-predictions")

    with mlflow.start_run(run_name=f"prediction-{datetime.now().isoformat()}"):
        # Log prediction data
        mlflow.log_param("num_predictions", len(predictions))
        mlflow.log_param("model_name", model_name)

        # Log metadata
        if metadata:
            mlflow.log_params(metadata)

        # Log input/output samples
        sample_data = {
            "inputs": inputs[:5].tolist() if hasattr(inputs, 'tolist') else inputs[:5],
            "predictions": predictions[:5].tolist() if hasattr(predictions, 'tolist') else predictions[:5]
        }

        with open("prediction_sample.json", "w") as f:
            json.dump(sample_data, f)
        mlflow.log_artifact("prediction_sample.json")
python
import mlflow
from datetime import datetime
import json

def log_predictions(model_name, inputs, predictions, metadata=None):
    """Log predictions for auditing and monitoring"""
    mlflow.set_experiment(f"{model_name}-predictions")

    with mlflow.start_run(run_name=f"prediction-{datetime.now().isoformat()}"):
        # Log prediction data
        mlflow.log_param("num_predictions", len(predictions))
        mlflow.log_param("model_name", model_name)

        # Log metadata
        if metadata:
            mlflow.log_params(metadata)

        # Log input/output samples
        sample_data = {
            "inputs": inputs[:5].tolist() if hasattr(inputs, 'tolist') else inputs[:5],
            "predictions": predictions[:5].tolist() if hasattr(predictions, 'tolist') else predictions[:5]
        }

        with open("prediction_sample.json", "w") as f:
            json.dump(sample_data, f)
        mlflow.log_artifact("prediction_sample.json")

Model Explainability Tracking

模型可解释性跟踪

python
import mlflow
import shap
import matplotlib.pyplot as plt

def log_model_explanations(model, X_test, feature_names):
    """Log SHAP explanations for model interpretability"""
    with mlflow.start_run():
        # Calculate SHAP values
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test)

        # Create summary plot
        plt.figure()
        shap.summary_plot(shap_values, X_test, feature_names=feature_names, show=False)
        plt.savefig("shap_summary.png", bbox_inches='tight')
        mlflow.log_artifact("shap_summary.png")

        # Log feature importance
        feature_importance = dict(zip(feature_names, model.feature_importances_))
        mlflow.log_params({f"importance_{k}": v for k, v in feature_importance.items()})
python
import mlflow
import shap
import matplotlib.pyplot as plt

def log_model_explanations(model, X_test, feature_names):
    """Log SHAP explanations for model interpretability"""
    with mlflow.start_run():
        # Calculate SHAP values
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test)

        # Create summary plot
        plt.figure()
        shap.summary_plot(shap_values, X_test, feature_names=feature_names, show=False)
        plt.savefig("shap_summary.png", bbox_inches='tight')
        mlflow.log_artifact("shap_summary.png")

        # Log feature importance
        feature_importance = dict(zip(feature_names, model.feature_importances_))
        mlflow.log_params({f"importance_{k}": v for k, v in feature_importance.items()})

A/B Testing

A/B测试

A/B Test Framework

A/B测试框架

python
import mlflow
import numpy as np
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a_uri, model_b_uri, traffic_split=0.5):
        self.model_a = mlflow.pyfunc.load_model(model_a_uri)
        self.model_b = mlflow.pyfunc.load_model(model_b_uri)
        self.traffic_split = traffic_split

        mlflow.set_experiment("ab-testing")

    def predict(self, data, user_id=None):
        """Route traffic between models and log results"""
        # Determine which model to use
        if user_id is None or hash(user_id) % 100 < self.traffic_split * 100:
            model_name = "model_a"
            prediction = self.model_a.predict(data)
        else:
            model_name = "model_b"
            prediction = self.model_b.predict(data)

        # Log the prediction
        with mlflow.start_run(run_name=f"ab-test-{datetime.now().isoformat()}"):
            mlflow.log_param("model_variant", model_name)
            mlflow.log_param("user_id", user_id)
            mlflow.log_metric("prediction", float(prediction[0]))

        return prediction

    def evaluate_test(self, results_a, results_b):
        """Evaluate A/B test results"""
        with mlflow.start_run(run_name="ab-test-evaluation"):
            # Calculate metrics for both variants
            metrics_a = {
                "mean_a": np.mean(results_a),
                "std_a": np.std(results_a),
                "count_a": len(results_a)
            }

            metrics_b = {
                "mean_b": np.mean(results_b),
                "std_b": np.std(results_b),
                "count_b": len(results_b)
            }

            # Statistical test
            from scipy import stats
            t_stat, p_value = stats.ttest_ind(results_a, results_b)

            mlflow.log_metrics({**metrics_a, **metrics_b})
            mlflow.log_metric("t_statistic", t_stat)
            mlflow.log_metric("p_value", p_value)

            # Determine winner
            if p_value < 0.05:
                winner = "model_a" if np.mean(results_a) > np.mean(results_b) else "model_b"
                mlflow.set_tag("winner", winner)
                mlflow.set_tag("significant", "yes")
            else:
                mlflow.set_tag("significant", "no")
python
import mlflow
import numpy as np
from datetime import datetime

class ABTestFramework:
    def __init__(self, model_a_uri, model_b_uri, traffic_split=0.5):
        self.model_a = mlflow.pyfunc.load_model(model_a_uri)
        self.model_b = mlflow.pyfunc.load_model(model_b_uri)
        self.traffic_split = traffic_split

        mlflow.set_experiment("ab-testing")

    def predict(self, data, user_id=None):
        """Route traffic between models and log results"""
        # Determine which model to use
        if user_id is None or hash(user_id) % 100 < self.traffic_split * 100:
            model_name = "model_a"
            prediction = self.model_a.predict(data)
        else:
            model_name = "model_b"
            prediction = self.model_b.predict(data)

        # Log the prediction
        with mlflow.start_run(run_name=f"ab-test-{datetime.now().isoformat()}"):
            mlflow.log_param("model_variant", model_name)
            mlflow.log_param("user_id", user_id)
            mlflow.log_metric("prediction", float(prediction[0]))

        return prediction

    def evaluate_test(self, results_a, results_b):
        """Evaluate A/B test results"""
        with mlflow.start_run(run_name="ab-test-evaluation"):
            # Calculate metrics for both variants
            metrics_a = {
                "mean_a": np.mean(results_a),
                "std_a": np.std(results_a),
                "count_a": len(results_a)
            }

            metrics_b = {
                "mean_b": np.mean(results_b),
                "std_b": np.std(results_b),
                "count_b": len(results_b)
            }

            # Statistical test
            from scipy import stats
            t_stat, p_value = stats.ttest_ind(results_a, results_b)

            mlflow.log_metrics({**metrics_a, **metrics_b})
            mlflow.log_metric("t_statistic", t_stat)
            mlflow.log_metric("p_value", p_value)

            # Determine winner
            if p_value < 0.05:
                winner = "model_a" if np.mean(results_a) > np.mean(results_b) else "model_b"
                mlflow.set_tag("winner", winner)
                mlflow.set_tag("significant", "yes")
            else:
                mlflow.set_tag("significant", "no")

Usage

Usage

ab_test = ABTestFramework( model_a_uri="models:/CustomerChurnModel@champion", model_b_uri="models:/CustomerChurnModel@challenger", traffic_split=0.5 )
prediction = ab_test.predict(customer_data, user_id="user123")
undefined
ab_test = ABTestFramework( model_a_uri="models:/CustomerChurnModel@champion", model_b_uri="models:/CustomerChurnModel@challenger", traffic_split=0.5 )
prediction = ab_test.predict(customer_data, user_id="user123")
undefined

Multi-Armed Bandit Testing

多臂老虎机测试

python
import mlflow
import numpy as np
from scipy.stats import beta

class MultiArmedBandit:
    def __init__(self, model_uris):
        self.models = [mlflow.pyfunc.load_model(uri) for uri in model_uris]
        self.successes = [1] * len(model_uris)  # Prior
        self.failures = [1] * len(model_uris)   # Prior

        mlflow.set_experiment("mab-testing")

    def select_model(self):
        """Thompson sampling to select model"""
        samples = [
            np.random.beta(s, f)
            for s, f in zip(self.successes, self.failures)
        ]
        return np.argmax(samples)

    def predict_and_update(self, data, actual_outcome=None):
        """Make prediction and update model performance"""
        model_idx = self.select_model()
        prediction = self.models[model_idx].predict(data)

        with mlflow.start_run(run_name=f"mab-prediction"):
            mlflow.log_param("selected_model", model_idx)
            mlflow.log_metric("prediction", float(prediction[0]))

            # Update based on outcome
            if actual_outcome is not None:
                if actual_outcome == prediction[0]:
                    self.successes[model_idx] += 1
                else:
                    self.failures[model_idx] += 1

                mlflow.log_metric("success_rate",
                    self.successes[model_idx] / (self.successes[model_idx] + self.failures[model_idx]))

        return prediction
python
import mlflow
import numpy as np
from scipy.stats import beta

class MultiArmedBandit:
    def __init__(self, model_uris):
        self.models = [mlflow.pyfunc.load_model(uri) for uri in model_uris]
        self.successes = [1] * len(model_uris)  # Prior
        self.failures = [1] * len(model_uris)   # Prior

        mlflow.set_experiment("mab-testing")

    def select_model(self):
        """Thompson sampling to select model"""
        samples = [
            np.random.beta(s, f)
            for s, f in zip(self.successes, self.failures)
        ]
        return np.argmax(samples)

    def predict_and_update(self, data, actual_outcome=None):
        """Make prediction and update model performance"""
        model_idx = self.select_model()
        prediction = self.models[model_idx].predict(data)

        with mlflow.start_run(run_name=f"mab-prediction"):
            mlflow.log_param("selected_model", model_idx)
            mlflow.log_metric("prediction", float(prediction[0]))

            # Update based on outcome
            if actual_outcome is not None:
                if actual_outcome == prediction[0]:
                    self.successes[model_idx] += 1
                else:
                    self.failures[model_idx] += 1

                mlflow.log_metric("success_rate",
                    self.successes[model_idx] / (self.successes[model_idx] + self.failures[model_idx]))

        return prediction

Feature Stores

特征存储

Feature Store Integration

特征存储集成

python
import mlflow
from datetime import datetime
import pandas as pd

class FeatureStore:
    def __init__(self, storage_path):
        self.storage_path = storage_path
        mlflow.set_experiment("feature-store")

    def create_feature_set(self, name, df, description=None):
        """Create and version a feature set"""
        with mlflow.start_run(run_name=f"feature-set-{name}"):
            # Save features
            feature_path = f"{self.storage_path}/{name}_{datetime.now().isoformat()}.parquet"
            df.to_parquet(feature_path)

            # Log metadata
            mlflow.log_param("feature_set_name", name)
            mlflow.log_param("num_features", len(df.columns))
            mlflow.log_param("num_samples", len(df))
            mlflow.log_param("description", description or "")

            # Log feature statistics
            stats = df.describe().to_dict()
            mlflow.log_dict(stats, "feature_stats.json")

            # Log artifact
            mlflow.log_artifact(feature_path)

            return feature_path

    def get_features(self, run_id):
        """Retrieve feature set by run ID"""
        client = mlflow.MlflowClient()
        run = client.get_run(run_id)
        artifact_uri = run.info.artifact_uri

        # Download and load features
        local_path = mlflow.artifacts.download_artifacts(artifact_uri)
        df = pd.read_parquet(local_path)

        return df
python
import mlflow
from datetime import datetime
import pandas as pd

class FeatureStore:
    def __init__(self, storage_path):
        self.storage_path = storage_path
        mlflow.set_experiment("feature-store")

    def create_feature_set(self, name, df, description=None):
        """Create and version a feature set"""
        with mlflow.start_run(run_name=f"feature-set-{name}"):
            # Save features
            feature_path = f"{self.storage_path}/{name}_{datetime.now().isoformat()}.parquet"
            df.to_parquet(feature_path)

            # Log metadata
            mlflow.log_param("feature_set_name", name)
            mlflow.log_param("num_features", len(df.columns))
            mlflow.log_param("num_samples", len(df))
            mlflow.log_param("description", description or "")

            # Log feature statistics
            stats = df.describe().to_dict()
            mlflow.log_dict(stats, "feature_stats.json")

            # Log artifact
            mlflow.log_artifact(feature_path)

            return feature_path

    def get_features(self, run_id):
        """Retrieve feature set by run ID"""
        client = mlflow.MlflowClient()
        run = client.get_run(run_id)
        artifact_uri = run.info.artifact_uri

        # Download and load features
        local_path = mlflow.artifacts.download_artifacts(artifact_uri)
        df = pd.read_parquet(local_path)

        return df

Usage

Usage

store = FeatureStore("s3://my-bucket/features")
store = FeatureStore("s3://my-bucket/features")

Create features

Create features

features = pd.DataFrame({ 'customer_id': range(1000), 'lifetime_value': np.random.rand(1000) * 1000, 'avg_purchase': np.random.rand(1000) * 100, 'days_since_last_purchase': np.random.randint(0, 365, 1000) })
feature_path = store.create_feature_set( name="customer_features", df=features, description="Customer behavioral features for churn prediction" )
undefined
features = pd.DataFrame({ 'customer_id': range(1000), 'lifetime_value': np.random.rand(1000) * 1000, 'avg_purchase': np.random.rand(1000) * 100, 'days_since_last_purchase': np.random.randint(0, 365, 1000) })
feature_path = store.create_feature_set( name="customer_features", df=features, description="Customer behavioral features for churn prediction" )
undefined

Feature Engineering Pipeline

特征工程流水线

python
import mlflow
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

def feature_engineering_pipeline(data, run_name="feature-engineering"):
    """Log feature engineering steps"""
    with mlflow.start_run(run_name=run_name):
        # Original features
        mlflow.log_param("original_features", len(data.columns))

        # Scaling
        scaler = StandardScaler()
        scaled_data = scaler.fit_transform(data)
        mlflow.sklearn.log_model(scaler, "scaler")

        # Dimensionality reduction
        pca = PCA(n_components=0.95)
        transformed_data = pca.fit_transform(scaled_data)
        mlflow.sklearn.log_model(pca, "pca")

        mlflow.log_param("final_features", transformed_data.shape[1])
        mlflow.log_metric("variance_explained", pca.explained_variance_ratio_.sum())

        return transformed_data
python
import mlflow
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

def feature_engineering_pipeline(data, run_name="feature-engineering"):
    """Log feature engineering steps"""
    with mlflow.start_run(run_name=run_name):
        # Original features
        mlflow.log_param("original_features", len(data.columns))

        # Scaling
        scaler = StandardScaler()
        scaled_data = scaler.fit_transform(data)
        mlflow.sklearn.log_model(scaler, "scaler")

        # Dimensionality reduction
        pca = PCA(n_components=0.95)
        transformed_data = pca.fit_transform(scaled_data)
        mlflow.sklearn.log_model(pca, "pca")

        mlflow.log_param("final_features", transformed_data.shape[1])
        mlflow.log_metric("variance_explained", pca.explained_variance_ratio_.sum())

        return transformed_data

Usage

Usage

transformed_features = feature_engineering_pipeline(raw_data)
undefined
transformed_features = feature_engineering_pipeline(raw_data)
undefined

CI/CD for ML

机器学习CI/CD

Training Pipeline

训练流水线

python
import mlflow
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier

def training_pipeline(data_path, model_params, validation_threshold=0.85):
    """Automated training pipeline with validation gates"""

    mlflow.set_experiment("production-training-pipeline")

    with mlflow.start_run(run_name="pipeline-run"):
        # Load data
        data = pd.read_csv(data_path)
        X = data.drop('target', axis=1)
        y = data['target']

        # Log data version
        mlflow.log_param("data_version", data_path.split('/')[-1])
        mlflow.log_param("data_samples", len(data))

        # Train model
        model = RandomForestClassifier(**model_params)
        model.fit(X, y)

        # Cross-validation
        cv_scores = cross_val_score(model, X, y, cv=5)
        mean_cv_score = cv_scores.mean()

        mlflow.log_params(model_params)
        mlflow.log_metric("cv_score_mean", mean_cv_score)
        mlflow.log_metric("cv_score_std", cv_scores.std())

        # Validation gate
        if mean_cv_score >= validation_threshold:
            # Log model
            mlflow.sklearn.log_model(
                sk_model=model,
                name="model",
                registered_model_name="ProductionModel"
            )
            mlflow.set_tag("status", "passed")
            return True
        else:
            mlflow.set_tag("status", "failed")
            mlflow.set_tag("failure_reason", "below_threshold")
            return False
python
import mlflow
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier

def training_pipeline(data_path, model_params, validation_threshold=0.85):
    """Automated training pipeline with validation gates"""

    mlflow.set_experiment("production-training-pipeline")

    with mlflow.start_run(run_name="pipeline-run"):
        # Load data
        data = pd.read_csv(data_path)
        X = data.drop('target', axis=1)
        y = data['target']

        # Log data version
        mlflow.log_param("data_version", data_path.split('/')[-1])
        mlflow.log_param("data_samples", len(data))

        # Train model
        model = RandomForestClassifier(**model_params)
        model.fit(X, y)

        # Cross-validation
        cv_scores = cross_val_score(model, X, y, cv=5)
        mean_cv_score = cv_scores.mean()

        mlflow.log_params(model_params)
        mlflow.log_metric("cv_score_mean", mean_cv_score)
        mlflow.log_metric("cv_score_std", cv_scores.std())

        # Validation gate
        if mean_cv_score >= validation_threshold:
            # Log model
            mlflow.sklearn.log_model(
                sk_model=model,
                name="model",
                registered_model_name="ProductionModel"
            )
            mlflow.set_tag("status", "passed")
            return True
        else:
            mlflow.set_tag("status", "failed")
            mlflow.set_tag("failure_reason", "below_threshold")
            return False

Usage in CI/CD

Usage in CI/CD

success = training_pipeline( data_path="data/training_data_v2.csv", model_params={'n_estimators': 100, 'max_depth': 10}, validation_threshold=0.85 )
if not success: raise ValueError("Model did not meet validation criteria")
undefined
success = training_pipeline( data_path="data/training_data_v2.csv", model_params={'n_estimators': 100, 'max_depth': 10}, validation_threshold=0.85 )
if not success: raise ValueError("Model did not meet validation criteria")
undefined

Model Promotion Pipeline

模型晋升流水线

python
from mlflow import MlflowClient

def promote_model_to_production(model_name, version, validation_results):
    """Promote model through stages with validation"""

    client = MlflowClient()

    # Validation checks
    required_metrics = ['accuracy', 'precision', 'recall']
    for metric in required_metrics:
        if metric not in validation_results:
            raise ValueError(f"Missing required metric: {metric}")
        if validation_results[metric] < 0.8:
            raise ValueError(f"{metric} below threshold: {validation_results[metric]}")

    # Set tags
    for metric, value in validation_results.items():
        client.set_model_version_tag(
            name=model_name,
            version=version,
            key=f"validation_{metric}",
            value=str(value)
        )

    # Promote to production
    client.set_registered_model_alias(
        name=model_name,
        alias="production",
        version=version
    )

    # Tag with promotion metadata
    client.set_model_version_tag(
        name=model_name,
        version=version,
        key="promoted_at",
        value=datetime.now().isoformat()
    )

    return True
python
from mlflow import MlflowClient

def promote_model_to_production(model_name, version, validation_results):
    """Promote model through stages with validation"""

    client = MlflowClient()

    # Validation checks
    required_metrics = ['accuracy', 'precision', 'recall']
    for metric in required_metrics:
        if metric not in validation_results:
            raise ValueError(f"Missing required metric: {metric}")
        if validation_results[metric] < 0.8:
            raise ValueError(f"{metric} below threshold: {validation_results[metric]}")

    # Set tags
    for metric, value in validation_results.items():
        client.set_model_version_tag(
            name=model_name,
            version=version,
            key=f"validation_{metric}",
            value=str(value)
        )

    # Promote to production
    client.set_registered_model_alias(
        name=model_name,
        alias="production",
        version=version
    )

    # Tag with promotion metadata
    client.set_model_version_tag(
        name=model_name,
        version=version,
        key="promoted_at",
        value=datetime.now().isoformat()
    )

    return True

Usage

Usage

validation_results = { 'accuracy': 0.92, 'precision': 0.89, 'recall': 0.91 }
promote_model_to_production( model_name="FraudDetectionModel", version="5", validation_results=validation_results )
undefined
validation_results = { 'accuracy': 0.92, 'precision': 0.89, 'recall': 0.91 }
promote_model_to_production( model_name="FraudDetectionModel", version="5", validation_results=validation_results )
undefined

Automated Model Retraining

自动化模型重训练

python
import mlflow
import schedule
import time

class AutomatedRetrainer:
    def __init__(self, model_name, data_source, schedule_interval="daily"):
        self.model_name = model_name
        self.data_source = data_source
        self.schedule_interval = schedule_interval

        mlflow.set_experiment(f"{model_name}-retraining")

    def retrain(self):
        """Retrain model with latest data"""
        with mlflow.start_run(run_name=f"retrain-{datetime.now().isoformat()}"):
            # Load latest data
            data = self.load_latest_data()

            # Get current production model
            client = MlflowClient()
            current_model = client.get_model_version_by_alias(
                self.model_name, "production"
            )

            # Load and evaluate current model
            current_model_obj = mlflow.sklearn.load_model(
                f"models:/{self.model_name}@production"
            )
            current_score = current_model_obj.score(X_test, y_test)

            mlflow.log_metric("current_production_score", current_score)

            # Train new model
            new_model = self.train_model(data)
            new_score = new_model.score(X_test, y_test)

            mlflow.log_metric("new_model_score", new_score)

            # Compare and promote if better
            if new_score > current_score:
                mlflow.sklearn.log_model(
                    sk_model=new_model,
                    name="model",
                    registered_model_name=self.model_name
                )
                mlflow.set_tag("status", "promoted")
            else:
                mlflow.set_tag("status", "not_promoted")

    def start_scheduled_retraining(self):
        """Start scheduled retraining"""
        if self.schedule_interval == "daily":
            schedule.every().day.at("02:00").do(self.retrain)
        elif self.schedule_interval == "weekly":
            schedule.every().monday.at("02:00").do(self.retrain)

        while True:
            schedule.run_pending()
            time.sleep(3600)
python
import mlflow
import schedule
import time

class AutomatedRetrainer:
    def __init__(self, model_name, data_source, schedule_interval="daily"):
        self.model_name = model_name
        self.data_source = data_source
        self.schedule_interval = schedule_interval

        mlflow.set_experiment(f"{model_name}-retraining")

    def retrain(self):
        """Retrain model with latest data"""
        with mlflow.start_run(run_name=f"retrain-{datetime.now().isoformat()}"):
            # Load latest data
            data = self.load_latest_data()

            # Get current production model
            client = MlflowClient()
            current_model = client.get_model_version_by_alias(
                self.model_name, "production"
            )

            # Load and evaluate current model
            current_model_obj = mlflow.sklearn.load_model(
                f"models:/{self.model_name}@production"
            )
            current_score = current_model_obj.score(X_test, y_test)

            mlflow.log_metric("current_production_score", current_score)

            # Train new model
            new_model = self.train_model(data)
            new_score = new_model.score(X_test, y_test)

            mlflow.log_metric("new_model_score", new_score)

            # Compare and promote if better
            if new_score > current_score:
                mlflow.sklearn.log_model(
                    sk_model=new_model,
                    name="model",
                    registered_model_name=self.model_name
                )
                mlflow.set_tag("status", "promoted")
            else:
                mlflow.set_tag("status", "not_promoted")

    def start_scheduled_retraining(self):
        """Start scheduled retraining"""
        if self.schedule_interval == "daily":
            schedule.every().day.at("02:00").do(self.retrain)
        elif self.schedule_interval == "weekly":
            schedule.every().monday.at("02:00").do(self.retrain)

        while True:
            schedule.run_pending()
            time.sleep(3600)

Usage

Usage

retrainer = AutomatedRetrainer( model_name="CustomerChurnModel", data_source="s3://bucket/data", schedule_interval="daily" )
undefined
retrainer = AutomatedRetrainer( model_name="CustomerChurnModel", data_source="s3://bucket/data", schedule_interval="daily" )
undefined

Production Best Practices

生产环境最佳实践

Model Signatures

模型签名

python
from mlflow.models import infer_signature, ModelSignature
from mlflow.types import Schema, ColSpec
import mlflow.sklearn
import numpy as np
python
from mlflow.models import infer_signature, ModelSignature
from mlflow.types import Schema, ColSpec
import mlflow.sklearn
import numpy as np

Method 1: Infer signature from data

Method 1: Infer signature from data

signature = infer_signature(X_train, model.predict(X_train))
signature = infer_signature(X_train, model.predict(X_train))

Method 2: Define explicit signature

Method 2: Define explicit signature

input_schema = Schema([ ColSpec("double", "age"), ColSpec("double", "income"), ColSpec("string", "customer_segment") ])
output_schema = Schema([ColSpec("double")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
input_schema = Schema([ ColSpec("double", "age"), ColSpec("double", "income"), ColSpec("string", "customer_segment") ])
output_schema = Schema([ColSpec("double")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

Log model with signature

Log model with signature

mlflow.sklearn.log_model( sk_model=model, name="model", signature=signature, input_example=X_train[:5] )
undefined
mlflow.sklearn.log_model( sk_model=model, name="model", signature=signature, input_example=X_train[:5] )
undefined

Model Validation Framework

模型验证框架

python
import mlflow
from sklearn.metrics import classification_report
import json

class ModelValidator:
    def __init__(self, thresholds):
        self.thresholds = thresholds

    def validate(self, model, X_test, y_test):
        """Comprehensive model validation"""
        results = {}

        with mlflow.start_run(run_name="model-validation"):
            # Performance metrics
            predictions = model.predict(X_test)
            report = classification_report(y_test, predictions, output_dict=True)

            # Check thresholds
            passed = True
            for metric, threshold in self.thresholds.items():
                value = report['weighted avg'][metric]
                results[metric] = value
                mlflow.log_metric(metric, value)

                if value < threshold:
                    passed = False
                    mlflow.set_tag(f"{metric}_failed", "true")

            # Detailed report
            with open("validation_report.json", "w") as f:
                json.dump(report, f, indent=2)
            mlflow.log_artifact("validation_report.json")

            mlflow.set_tag("validation_passed", str(passed))

            return passed, results
python
import mlflow
from sklearn.metrics import classification_report
import json

class ModelValidator:
    def __init__(self, thresholds):
        self.thresholds = thresholds

    def validate(self, model, X_test, y_test):
        """Comprehensive model validation"""
        results = {}

        with mlflow.start_run(run_name="model-validation"):
            # Performance metrics
            predictions = model.predict(X_test)
            report = classification_report(y_test, predictions, output_dict=True)

            # Check thresholds
            passed = True
            for metric, threshold in self.thresholds.items():
                value = report['weighted avg'][metric]
                results[metric] = value
                mlflow.log_metric(metric, value)

                if value < threshold:
                    passed = False
                    mlflow.set_tag(f"{metric}_failed", "true")

            # Detailed report
            with open("validation_report.json", "w") as f:
                json.dump(report, f, indent=2)
            mlflow.log_artifact("validation_report.json")

            mlflow.set_tag("validation_passed", str(passed))

            return passed, results

Usage

Usage

validator = ModelValidator(thresholds={ 'precision': 0.85, 'recall': 0.80, 'f1-score': 0.82 })
passed, results = validator.validate(model, X_test, y_test)
undefined
validator = ModelValidator(thresholds={ 'precision': 0.85, 'recall': 0.80, 'f1-score': 0.82 })
passed, results = validator.validate(model, X_test, y_test)
undefined

Error Handling and Logging

错误处理与日志记录

python
import mlflow
import logging
from functools import wraps

def mlflow_error_handler(func):
    """Decorator for MLflow error handling"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        with mlflow.start_run(run_name=f"{func.__name__}"):
            try:
                result = func(*args, **kwargs)
                mlflow.set_tag("status", "success")
                return result

            except Exception as e:
                # Log error
                mlflow.set_tag("status", "failed")
                mlflow.set_tag("error_type", type(e).__name__)
                mlflow.set_tag("error_message", str(e))

                # Log traceback
                import traceback
                tb = traceback.format_exc()
                with open("error_traceback.txt", "w") as f:
                    f.write(tb)
                mlflow.log_artifact("error_traceback.txt")

                logging.error(f"Error in {func.__name__}: {str(e)}")
                raise

    return wrapper

@mlflow_error_handler
def train_model_with_error_handling(data):
    # Training code
    model = RandomForestClassifier()
    model.fit(X, y)
    return model
python
import mlflow
import logging
from functools import wraps

def mlflow_error_handler(func):
    """Decorator for MLflow error handling"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        with mlflow.start_run(run_name=f"{func.__name__}"):
            try:
                result = func(*args, **kwargs)
                mlflow.set_tag("status", "success")
                return result

            except Exception as e:
                # Log error
                mlflow.set_tag("status", "failed")
                mlflow.set_tag("error_type", type(e).__name__)
                mlflow.set_tag("error_message", str(e))

                # Log traceback
                import traceback
                tb = traceback.format_exc()
                with open("error_traceback.txt", "w") as f:
                    f.write(tb)
                mlflow.log_artifact("error_traceback.txt")

                logging.error(f"Error in {func.__name__}: {str(e)}")
                raise

    return wrapper

@mlflow_error_handler
def train_model_with_error_handling(data):
    # Training code
    model = RandomForestClassifier()
    model.fit(X, y)
    return model

Model Performance Baseline

模型性能基准

python
import mlflow
from sklearn.dummy import DummyClassifier

def establish_baseline(X_train, y_train, X_test, y_test):
    """Establish baseline model performance"""
    mlflow.set_experiment("baseline-models")

    strategies = ['most_frequent', 'stratified', 'uniform']

    for strategy in strategies:
        with mlflow.start_run(run_name=f"baseline-{strategy}"):
            baseline = DummyClassifier(strategy=strategy)
            baseline.fit(X_train, y_train)
            score = baseline.score(X_test, y_test)

            mlflow.log_param("strategy", strategy)
            mlflow.log_metric("accuracy", score)

            mlflow.sklearn.log_model(
                sk_model=baseline,
                name="baseline_model",
                registered_model_name=f"Baseline-{strategy}"
            )
python
import mlflow
from sklearn.dummy import DummyClassifier

def establish_baseline(X_train, y_train, X_test, y_test):
    """Establish baseline model performance"""
    mlflow.set_experiment("baseline-models")

    strategies = ['most_frequent', 'stratified', 'uniform']

    for strategy in strategies:
        with mlflow.start_run(run_name=f"baseline-{strategy}"):
            baseline = DummyClassifier(strategy=strategy)
            baseline.fit(X_train, y_train)
            score = baseline.score(X_test, y_test)

            mlflow.log_param("strategy", strategy)
            mlflow.log_metric("accuracy", score)

            mlflow.sklearn.log_model(
                sk_model=baseline,
                name="baseline_model",
                registered_model_name=f"Baseline-{strategy}"
            )

Usage

Usage

establish_baseline(X_train, y_train, X_test, y_test)
undefined
establish_baseline(X_train, y_train, X_test, y_test)
undefined

Summary

总结

This comprehensive guide covers production-grade MLOps workflows using MLflow:
  1. Experiment Tracking: Log parameters, metrics, and artifacts systematically
  2. Model Registry: Centralized model versioning and lifecycle management
  3. Deployment: Multiple deployment patterns for various platforms
  4. Monitoring: Track model performance and data drift in production
  5. A/B Testing: Compare model variants in production
  6. Feature Stores: Version and manage feature engineering
  7. CI/CD: Automated training, validation, and promotion pipelines
  8. Best Practices: Signatures, validation, error handling, and baselines
These patterns enable teams to build robust, scalable ML systems from experimentation through production deployment and monitoring.
本全面指南涵盖了基于MLflow的生产级MLOps工作流:
  1. 实验跟踪:系统化记录参数、指标及工件
  2. 模型注册:中心化模型版本控制与生命周期管理
  3. 部署:适用于多平台的多种部署模式
  4. 监控:跟踪生产环境中的模型性能与数据漂移
  5. A/B测试:在生产环境中对比模型变体
  6. 特征存储:版本化管理特征工程成果
  7. CI/CD:自动化训练、验证与晋升流水线
  8. 最佳实践:签名、验证、错误处理及基准设置
这些模式帮助团队构建从实验到生产部署及监控的稳健、可扩展机器学习系统。