automl-pipeline-setup
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAutoML Pipeline Setup Expert
AutoML流水线搭建专家
Эксперт по проектированию и реализации автоматизированных систем машинного обучения.
专注于设计和实现自动化机器学习系统的专家。
Архитектура пайплайна
流水线架构
Модульные компоненты
模块化组件
Data Ingestion → Validation → Feature Engineering → Model Training → Evaluation → DeploymentData Ingestion → Validation → Feature Engineering → Model Training → Evaluation → DeploymentКонфигурация через YAML
通过YAML配置
yaml
pipeline:
name: customer_churn_prediction
version: "1.0"
data:
source: "s3://bucket/data.parquet"
validation:
null_threshold: 0.1
duplicate_check: true
features:
numerical:
- age
- tenure
- monthly_charges
categorical:
- contract_type
- payment_method
target: churn
automl:
framework: h2o
max_runtime_secs: 3600
max_models: 20
stopping_metric: AUC
sort_metric: AUC
deployment:
platform: mlflow
model_registry: trueyaml
pipeline:
name: customer_churn_prediction
version: "1.0"
data:
source: "s3://bucket/data.parquet"
validation:
null_threshold: 0.1
duplicate_check: true
features:
numerical:
- age
- tenure
- monthly_charges
categorical:
- contract_type
- payment_method
target: churn
automl:
framework: h2o
max_runtime_secs: 3600
max_models: 20
stopping_metric: AUC
sort_metric: AUC
deployment:
platform: mlflow
model_registry: trueData Validation с Great Expectations
基于Great Expectations的数据验证
python
import great_expectations as gx
def validate_data(df, expectation_suite_name="default"):
context = gx.get_context()
# Создание expectation suite
suite = context.add_expectation_suite(expectation_suite_name)
# Определение expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name
)
# Проверки качества данных
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=18, max_value=100)
validator.expect_column_values_to_be_in_set(
"contract_type",
["month-to-month", "one_year", "two_year"]
)
# Валидация
results = validator.validate()
return results.successpython
import great_expectations as gx
def validate_data(df, expectation_suite_name="default"):
context = gx.get_context()
# 创建期望套件
suite = context.add_expectation_suite(expectation_suite_name)
# 定义验证规则
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name
)
# 数据质量检查
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=18, max_value=100)
validator.expect_column_values_to_be_in_set(
"contract_type",
["month-to-month", "one_year", "two_year"]
)
# 执行验证
results = validator.validate()
return results.successFeature Engineering Pipeline
特征工程流水线
python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from feature_engine.creation import CyclicalFeatures
from feature_engine.selection import DropCorrelatedFeatures
def create_feature_pipeline(numerical_cols, categorical_cols):
numerical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('num', numerical_transformer, numerical_cols),
('cat', categorical_transformer, categorical_cols)
])
feature_pipeline = Pipeline([
('preprocessor', preprocessor),
('drop_correlated', DropCorrelatedFeatures(threshold=0.95))
])
return feature_pipelinepython
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from feature_engine.creation import CyclicalFeatures
from feature_engine.selection import DropCorrelatedFeatures
def create_feature_pipeline(numerical_cols, categorical_cols):
numerical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('num', numerical_transformer, numerical_cols),
('cat', categorical_transformer, categorical_cols)
])
feature_pipeline = Pipeline([
('preprocessor', preprocessor),
('drop_correlated', DropCorrelatedFeatures(threshold=0.95))
])
return feature_pipelineH2O AutoML
H2O AutoML
python
import h2o
from h2o.automl import H2OAutoML
def train_automl_model(train_df, target_col, config):
h2o.init()
# Конвертация в H2O Frame
h2o_train = h2o.H2OFrame(train_df)
# Определение типов колонок
h2o_train[target_col] = h2o_train[target_col].asfactor()
# Предикторы
predictors = [col for col in h2o_train.columns if col != target_col]
# AutoML
aml = H2OAutoML(
max_runtime_secs=config.get('max_runtime_secs', 3600),
max_models=config.get('max_models', 20),
stopping_metric=config.get('stopping_metric', 'AUC'),
sort_metric=config.get('sort_metric', 'AUC'),
seed=42,
exclude_algos=['DeepLearning'], # Опционально исключить алгоритмы
nfolds=5
)
aml.train(
x=predictors,
y=target_col,
training_frame=h2o_train
)
# Лидерборд
leaderboard = aml.leaderboard.as_data_frame()
print(leaderboard)
return aml.leaderpython
import h2o
from h2o.automl import H2OAutoML
def train_automl_model(train_df, target_col, config):
h2o.init()
# 转换为H2O Frame
h2o_train = h2o.H2OFrame(train_df)
# 设置目标列类型为因子
h2o_train[target_col] = h2o_train[target_col].asfactor()
# 确定特征列
predictors = [col for col in h2o_train.columns if col != target_col]
# 初始化AutoML
aml = H2OAutoML(
max_runtime_secs=config.get('max_runtime_secs', 3600),
max_models=config.get('max_models', 20),
stopping_metric=config.get('stopping_metric', 'AUC'),
sort_metric=config.get('sort_metric', 'AUC'),
seed=42,
exclude_algos=['DeepLearning'], # 可选:排除特定算法
nfolds=5
)
# 训练模型
aml.train(
x=predictors,
y=target_col,
training_frame=h2o_train
)
# 查看排行榜
leaderboard = aml.leaderboard.as_data_frame()
print(leaderboard)
return aml.leaderMLflow Experiment Tracking
MLflow实验跟踪
python
import mlflow
from mlflow.tracking import MlflowClient
class ExperimentTracker:
def __init__(self, experiment_name):
mlflow.set_experiment(experiment_name)
self.client = MlflowClient()
def log_automl_run(self, model, metrics, params, artifacts_path=None):
with mlflow.start_run():
# Логирование параметров
for key, value in params.items():
mlflow.log_param(key, value)
# Логирование метрик
for key, value in metrics.items():
mlflow.log_metric(key, value)
# Логирование модели
mlflow.h2o.log_model(model, "model")
# Логирование артефактов
if artifacts_path:
mlflow.log_artifacts(artifacts_path)
run_id = mlflow.active_run().info.run_id
return run_id
def register_best_model(self, run_id, model_name):
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, model_name)python
import mlflow
from mlflow.tracking import MlflowClient
class ExperimentTracker:
def __init__(self, experiment_name):
mlflow.set_experiment(experiment_name)
self.client = MlflowClient()
def log_automl_run(self, model, metrics, params, artifacts_path=None):
with mlflow.start_run():
# 记录参数
for key, value in params.items():
mlflow.log_param(key, value)
# 记录指标
for key, value in metrics.items():
mlflow.log_metric(key, value)
# 记录模型
mlflow.h2o.log_model(model, "model")
# 记录 artifacts
if artifacts_path:
mlflow.log_artifacts(artifacts_path)
run_id = mlflow.active_run().info.run_id
return run_id
def register_best_model(self, run_id, model_name):
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, model_name)Optuna для Hyperparameter Tuning
基于Optuna的超参数调优
python
import optuna
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
def objective(trial, X, y):
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 500),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
}
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
return scores.mean()
def run_optimization(X, y, n_trials=100):
study = optuna.create_study(direction='maximize')
study.optimize(lambda trial: objective(trial, X, y), n_trials=n_trials)
print(f"Best trial: {study.best_trial.value}")
print(f"Best params: {study.best_params}")
return study.best_paramspython
import optuna
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
def objective(trial, X, y):
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 500),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
}
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
return scores.mean()
def run_optimization(X, y, n_trials=100):
study = optuna.create_study(direction='maximize')
study.optimize(lambda trial: objective(trial, X, y), n_trials=n_trials)
print(f"最佳试验结果: {study.best_trial.value}")
print(f"最佳参数: {study.best_params}")
return study.best_paramsAirflow DAG для оркестрации
基于Airflow DAG的编排
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'automl_pipeline',
default_args=default_args,
description='AutoML Training Pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
validate_data_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
feature_engineering_task = PythonOperator(
task_id='feature_engineering',
python_callable=run_feature_engineering,
dag=dag
)
automl_training_task = PythonOperator(
task_id='automl_training',
python_callable=train_automl_model,
dag=dag
)
model_validation_task = PythonOperator(
task_id='model_validation',
python_callable=validate_model,
dag=dag
)
validate_data_task >> feature_engineering_task >> automl_training_task >> model_validation_taskpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'automl_pipeline',
default_args=default_args,
description='AutoML Training Pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
validate_data_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
feature_engineering_task = PythonOperator(
task_id='feature_engineering',
python_callable=run_feature_engineering,
dag=dag
)
automl_training_task = PythonOperator(
task_id='automl_training',
python_callable=train_automl_model,
dag=dag
)
model_validation_task = PythonOperator(
task_id='model_validation',
python_callable=validate_model,
dag=dag
)
validate_data_task >> feature_engineering_task >> automl_training_task >> model_validation_taskРекомендации по фреймворкам
框架推荐
| Сценарий | Рекомендация |
|---|---|
| Enterprise, табличные данные | H2O.ai |
| Cloud-native | Google Vertex AI, AWS SageMaker |
| Быстрое прототипирование | AutoGluon, FLAML |
| Кастомизация | MLflow + Optuna |
| Deep Learning | AutoKeras, Neural Architecture Search |
| 场景 | 推荐方案 |
|---|---|
| 企业级、表格数据 | H2O.ai |
| 云原生 | Google Vertex AI, AWS SageMaker |
| 快速原型开发 | AutoGluon, FLAML |
| 自定义需求 | MLflow + Optuna |
| 深度学习 | AutoKeras, Neural Architecture Search |
Лучшие практики
最佳实践
- Data sampling — для ускорения экспериментов
- Early stopping — прекращение неперспективных моделей
- Resource management — лимиты памяти и CPU
- Distributed training — Ray Tune, Dask для масштабирования
- Model versioning — отслеживание всех экспериментов
- Reproducibility — фиксация random seeds
- 数据采样 — 加速实验进程
- 早停机制 — 终止无前景的模型训练
- 资源管理 — 设置内存和CPU限制
- 分布式训练 — 使用Ray Tune、Dask进行扩展
- 模型版本控制 — 跟踪所有实验记录
- 可复现性 — 固定随机种子