Loading...
Loading...
Compare original and translation side by side
ml-project/
├── data/
│ ├── raw/
│ ├── processed/
│ └── features/
├── notebooks/
│ ├── 01_exploration.ipynb
│ ├── 02_feature_engineering.ipynb
│ └── 03_modeling.ipynb
├── src/
│ ├── data/
│ │ ├── ingestion.py
│ │ └── preprocessing.py
│ ├── features/
│ │ ├── engineering.py
│ │ └── store.py
│ ├── models/
│ │ ├── train.py
│ │ ├── evaluate.py
│ │ └── predict.py
│ └── serving/
│ ├── api.py
│ └── batch.py
├── tests/
├── configs/
├── mlruns/
└── requirements.txtml-project/
├── data/
│ ├── raw/
│ ├── processed/
│ └── features/
├── notebooks/
│ ├── 01_exploration.ipynb
│ ├── 02_feature_engineering.ipynb
│ └── 03_modeling.ipynb
├── src/
│ ├── data/
│ │ ├── ingestion.py
│ │ └── preprocessing.py
│ ├── features/
│ │ ├── engineering.py
│ │ └── store.py
│ ├── models/
│ │ ├── train.py
│ │ ├── evaluate.py
│ │ └── predict.py
│ └── serving/
│ ├── api.py
│ └── batch.py
├── tests/
├── configs/
├── mlruns/
└── requirements.txtdef engineer_numerical_features(df: pd.DataFrame) -> pd.DataFrame:
# Scaling
df['amount_scaled'] = StandardScaler().fit_transform(df[['amount']])
# Log transform for skewed data
df['amount_log'] = np.log1p(df['amount'])
# Binning
df['amount_bucket'] = pd.cut(df['amount'],
bins=[0, 100, 500, 1000, np.inf],
labels=['low', 'medium', 'high', 'very_high'])
# Interactions
df['amount_per_transaction'] = df['total_amount'] / df['transaction_count']
return dfdef engineer_categorical_features(df: pd.DataFrame) -> pd.DataFrame:
# One-hot encoding
df = pd.get_dummies(df, columns=['category'], prefix='cat')
# Target encoding
target_means = df.groupby('category')['target'].mean()
df['category_target_enc'] = df['category'].map(target_means)
# Frequency encoding
freq = df['category'].value_counts(normalize=True)
df['category_freq'] = df['category'].map(freq)
return dfdef engineer_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Basic components
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['month'] = df['timestamp'].dt.month
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# Cyclical encoding
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
# Lag features
df['amount_lag_1'] = df.groupby('user_id')['amount'].shift(1)
df['amount_rolling_mean_7'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling(7, min_periods=1).mean()
)
return dfdef engineer_numerical_features(df: pd.DataFrame) -> pd.DataFrame:
# Scaling
df['amount_scaled'] = StandardScaler().fit_transform(df[['amount']])
# Log transform for skewed data
df['amount_log'] = np.log1p(df['amount'])
# Binning
df['amount_bucket'] = pd.cut(df['amount'],
bins=[0, 100, 500, 1000, np.inf],
labels=['low', 'medium', 'high', 'very_high'])
# Interactions
df['amount_per_transaction'] = df['total_amount'] / df['transaction_count']
return dfdef engineer_categorical_features(df: pd.DataFrame) -> pd.DataFrame:
# One-hot encoding
df = pd.get_dummies(df, columns=['category'], prefix='cat')
# Target encoding
target_means = df.groupby('category')['target'].mean()
df['category_target_enc'] = df['category'].map(target_means)
# Frequency encoding
freq = df['category'].value_counts(normalize=True)
df['category_freq'] = df['category'].map(freq)
return dfdef engineer_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Basic components
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['month'] = df['timestamp'].dt.month
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# Cyclical encoding
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
# Lag features
df['amount_lag_1'] = df.groupby('user_id')['amount'].shift(1)
df['amount_rolling_mean_7'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling(7, min_periods=1).mean()
)
return dffrom feast import FeatureStore, Entity, Feature, FeatureView, FileSourcefrom feast import FeatureStore, Entity, Feature, FeatureView, FileSourceundefinedundefinedimport torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm
import mlflow
class Model(nn.Module):
def __init__(self, input_dim: int, hidden_dim: int, output_dim: int):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim // 2, output_dim)
)
def forward(self, x):
return self.layers(x)
def train_model(
model: nn.Module,
train_loader: DataLoader,
val_loader: DataLoader,
epochs: int,
learning_rate: float
):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3)
best_val_loss = float('inf')
with mlflow.start_run():
mlflow.log_params({
'epochs': epochs,
'learning_rate': learning_rate,
'batch_size': train_loader.batch_size
})
for epoch in range(epochs):
# Training
model.train()
train_loss = 0
for batch_x, batch_y in tqdm(train_loader, desc=f'Epoch {epoch+1}'):
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
val_loss += loss.item()
_, predicted = outputs.max(1)
total += batch_y.size(0)
correct += predicted.eq(batch_y).sum().item()
val_accuracy = correct / total
scheduler.step(val_loss)
mlflow.log_metrics({
'train_loss': train_loss / len(train_loader),
'val_loss': val_loss / len(val_loader),
'val_accuracy': val_accuracy
}, step=epoch)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pt')
mlflow.pytorch.log_model(model, 'model')
return modelimport torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm
import mlflow
class Model(nn.Module):
def __init__(self, input_dim: int, hidden_dim: int, output_dim: int):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim // 2, output_dim)
)
def forward(self, x):
return self.layers(x)
def train_model(
model: nn.Module,
train_loader: DataLoader,
val_loader: DataLoader,
epochs: int,
learning_rate: float
):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3)
best_val_loss = float('inf')
with mlflow.start_run():
mlflow.log_params({
'epochs': epochs,
'learning_rate': learning_rate,
'batch_size': train_loader.batch_size
})
for epoch in range(epochs):
# Training
model.train()
train_loss = 0
for batch_x, batch_y in tqdm(train_loader, desc=f'Epoch {epoch+1}'):
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
val_loss += loss.item()
_, predicted = outputs.max(1)
total += batch_y.size(0)
correct += predicted.eq(batch_y).sum().item()
val_accuracy = correct / total
scheduler.step(val_loss)
mlflow.log_metrics({
'train_loss': train_loss / len(train_loader),
'val_loss': val_loss / len(val_loader),
'val_accuracy': val_accuracy
}, step=epoch)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pt')
mlflow.pytorch.log_model(model, 'model')
return modelimport optuna
from sklearn.model_selection import cross_val_score
def objective(trial):
params = {
'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
}
model = XGBClassifier(**params, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=5, scoring='roc_auc')
return scores.mean()
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100, timeout=3600)
print(f"Best params: {study.best_params}")
print(f"Best score: {study.best_value}")import optuna
from sklearn.model_selection import cross_val_score
def objective(trial):
params = {
'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
}
model = XGBClassifier(**params, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=5, scoring='roc_auc')
return scores.mean()
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100, timeout=3600)
print(f"Best params: {study.best_params}")
print(f"Best score: {study.best_value}")from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
app = FastAPI()from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
app = FastAPI()try:
features = torch.tensor([request.features], dtype=torch.float32)
with torch.no_grad():
logits = model(features)
probabilities = torch.softmax(logits, dim=1)
prediction = probabilities.argmax(dim=1).item()
return PredictionResponse(
prediction=prediction,
probability=probabilities[0][prediction].item(),
class_probabilities=probabilities[0].tolist()
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))undefinedtry:
features = torch.tensor([request.features], dtype=torch.float32)
with torch.no_grad():
logits = model(features)
probabilities = torch.softmax(logits, dim=1)
prediction = probabilities.argmax(dim=1).item()
return PredictionResponse(
prediction=prediction,
probability=probabilities[0][prediction].item(),
class_probabilities=probabilities[0].tolist()
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))undefinedimport ray
from typing import Iterator
@ray.remote
class BatchPredictor:
def __init__(self, model_path: str):
self.model = self._load_model(model_path)
def _load_model(self, path: str):
model = Model(input_dim=10, hidden_dim=64, output_dim=2)
model.load_state_dict(torch.load(path))
model.eval()
return model
def predict_batch(self, features: np.ndarray) -> np.ndarray:
with torch.no_grad():
inputs = torch.tensor(features, dtype=torch.float32)
outputs = self.model(inputs)
predictions = torch.argmax(outputs, dim=1)
return predictions.numpy()
def run_batch_inference(data_path: str, output_path: str, batch_size: int = 1000):
ray.init()
# Create prediction actors
num_actors = 4
actors = [BatchPredictor.remote('best_model.pt') for _ in range(num_actors)]
# Process in batches
df = pd.read_parquet(data_path)
predictions = []
for i in range(0, len(df), batch_size * num_actors):
batch_futures = []
for j, actor in enumerate(actors):
start = i + j * batch_size
end = min(start + batch_size, len(df))
if start < len(df):
features = df.iloc[start:end][feature_columns].values
batch_futures.append(actor.predict_batch.remote(features))
batch_predictions = ray.get(batch_futures)
predictions.extend(np.concatenate(batch_predictions))
df['prediction'] = predictions
df.to_parquet(output_path)
ray.shutdown()import ray
from typing import Iterator
@ray.remote
class BatchPredictor:
def __init__(self, model_path: str):
self.model = self._load_model(model_path)
def _load_model(self, path: str):
model = Model(input_dim=10, hidden_dim=64, output_dim=2)
model.load_state_dict(torch.load(path))
model.eval()
return model
def predict_batch(self, features: np.ndarray) -> np.ndarray:
with torch.no_grad():
inputs = torch.tensor(features, dtype=torch.float32)
outputs = self.model(inputs)
predictions = torch.argmax(outputs, dim=1)
return predictions.numpy()
def run_batch_inference(data_path: str, output_path: str, batch_size: int = 1000):
ray.init()
# Create prediction actors
num_actors = 4
actors = [BatchPredictor.remote('best_model.pt') for _ in range(num_actors)]
# Process in batches
df = pd.read_parquet(data_path)
predictions = []
for i in range(0, len(df), batch_size * num_actors):
batch_futures = []
for j, actor in enumerate(actors):
start = i + j * batch_size
end = min(start + batch_size, len(df))
if start < len(df):
features = df.iloc[start:end][feature_columns].values
batch_futures.append(actor.predict_batch.remote(features))
batch_predictions = ray.get(batch_futures)
predictions.extend(np.concatenate(batch_predictions))
df['prediction'] = predictions
df.to_parquet(output_path)
ray.shutdown()from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
def detect_drift(reference_data: pd.DataFrame, current_data: pd.DataFrame):
column_mapping = ColumnMapping(
target='target',
numerical_features=['feature_1', 'feature_2', 'feature_3'],
categorical_features=['category_1', 'category_2']
)
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset()
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping
)
# Get drift summary
drift_results = report.as_dict()
data_drift_detected = drift_results['metrics'][0]['result']['dataset_drift']
drifted_features = [
col for col, result in drift_results['metrics'][0]['result']['drift_by_columns'].items()
if result['drift_detected']
]
return {
'drift_detected': data_drift_detected,
'drifted_features': drifted_features,
'report': report
}from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
def detect_drift(reference_data: pd.DataFrame, current_data: pd.DataFrame):
column_mapping = ColumnMapping(
target='target',
numerical_features=['feature_1', 'feature_2', 'feature_3'],
categorical_features=['category_1', 'category_2']
)
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset()
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping
)
# Get drift summary
drift_results = report.as_dict()
data_drift_detected = drift_results['metrics'][0]['result']['dataset_drift']
drifted_features = [
col for col, result in drift_results['metrics'][0]['result']['drift_by_columns'].items()
if result['drift_detected']
]
return {
'drift_detected': data_drift_detected,
'drifted_features': drifted_features,
'report': report
}from prometheus_client import Counter, Histogram, Gauge
import timefrom prometheus_client import Counter, Histogram, Gauge
import timeprediction = model.predict(features)
# Record metrics
latency = time.time() - start_time
prediction_latency.observe(latency)
prediction_counter.labels(
model_version=model_version,
prediction_class=str(prediction)
).inc()
return predictionundefinedprediction = model.predict(features)
# Record metrics
latency = time.time() - start_time
prediction_latency.observe(latency)
prediction_counter.labels(
model_version=model_version,
prediction_class=str(prediction)
).inc()
return predictionundefinedreferences/feature_engineering.mdreferences/mlops_guide.mdreferences/model_serving.mdreferences/monitoring.mdreferences/feature_engineering.mdreferences/mlops_guide.mdreferences/model_serving.mdreferences/monitoring.mdundefinedundefinedundefinedundefined