Loading...
Loading...
Evaluates ML models for performance, fairness, and reliability. Use for metric selection, cross-validation strategies, overfitting/underfitting diagnosis, hyperparameter tuning, LLM evaluation, A/B testing, and production monitoring for model drift.
npx skill4agent add cosmix/claude-loom loom-model-evaluation/loom-debugging/loom-testing/loom-data-validationimport numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, average_precision_score, confusion_matrix,
classification_report, roc_curve, precision_recall_curve
)
import matplotlib.pyplot as plt
class ClassificationEvaluator:
"""Comprehensive classification model evaluator."""
def __init__(self, y_true, y_pred, y_prob=None, class_names=None):
self.y_true = y_true
self.y_pred = y_pred
self.y_prob = y_prob
self.class_names = class_names or ['Negative', 'Positive']
def compute_metrics(self) -> dict:
"""Compute all classification metrics."""
metrics = {
'accuracy': accuracy_score(self.y_true, self.y_pred),
'precision': precision_score(self.y_true, self.y_pred, average='weighted'),
'recall': recall_score(self.y_true, self.y_pred, average='weighted'),
'f1': f1_score(self.y_true, self.y_pred, average='weighted'),
}
if self.y_prob is not None:
metrics['roc_auc'] = roc_auc_score(self.y_true, self.y_prob)
metrics['average_precision'] = average_precision_score(self.y_true, self.y_prob)
return metrics
def confusion_matrix_analysis(self) -> dict:
"""Analyze confusion matrix in detail."""
cm = confusion_matrix(self.y_true, self.y_pred)
tn, fp, fn, tp = cm.ravel()
return {
'confusion_matrix': cm,
'true_negatives': tn,
'false_positives': fp,
'false_negatives': fn,
'true_positives': tp,
'specificity': tn / (tn + fp),
'sensitivity': tp / (tp + fn),
'false_positive_rate': fp / (fp + tn),
'false_negative_rate': fn / (fn + tp),
}
def plot_roc_curve(self, save_path=None):
"""Plot ROC curve with AUC."""
if self.y_prob is None:
raise ValueError("Probabilities required for ROC curve")
fpr, tpr, thresholds = roc_curve(self.y_true, self.y_prob)
auc = roc_auc_score(self.y_true, self.y_prob)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC (AUC = {auc:.3f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend()
plt.grid(True, alpha=0.3)
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.show()
def generate_report(self) -> str:
"""Generate comprehensive evaluation report."""
metrics = self.compute_metrics()
cm_analysis = self.confusion_matrix_analysis()
report = f"""
# Classification Model Evaluation Report
## Overall Metrics
| Metric | Value |
|--------|-------|
| Accuracy | {metrics['accuracy']:.4f} |
| Precision | {metrics['precision']:.4f} |
| Recall | {metrics['recall']:.4f} |
| F1 Score | {metrics['f1']:.4f} |
| ROC AUC | {metrics.get('roc_auc', 'N/A'):.4f if isinstance(metrics.get('roc_auc'), float) else 'N/A'} |
## Confusion Matrix Analysis
| Metric | Value |
|--------|-------|
| True Positives | {cm_analysis['true_positives']} |
| True Negatives | {cm_analysis['true_negatives']} |
| False Positives | {cm_analysis['false_positives']} |
| False Negatives | {cm_analysis['false_negatives']} |
| Sensitivity | {cm_analysis['sensitivity']:.4f} |
| Specificity | {cm_analysis['specificity']:.4f} |
## Detailed Classification Report
{classification_report(self.y_true, self.y_pred, target_names=self.class_names)}
"""
return report
# Usage
evaluator = ClassificationEvaluator(y_true, y_pred, y_prob)
print(evaluator.generate_report())
evaluator.plot_roc_curve('roc_curve.png')from sklearn.metrics import (
mean_squared_error, mean_absolute_error, r2_score,
mean_absolute_percentage_error, explained_variance_score
)
import numpy as np
class RegressionEvaluator:
"""Comprehensive regression model evaluator."""
def __init__(self, y_true, y_pred):
self.y_true = np.array(y_true)
self.y_pred = np.array(y_pred)
self.residuals = self.y_true - self.y_pred
def compute_metrics(self) -> dict:
"""Compute all regression metrics."""
mse = mean_squared_error(self.y_true, self.y_pred)
return {
'mse': mse,
'rmse': np.sqrt(mse),
'mae': mean_absolute_error(self.y_true, self.y_pred),
'mape': mean_absolute_percentage_error(self.y_true, self.y_pred) * 100,
'r2': r2_score(self.y_true, self.y_pred),
'explained_variance': explained_variance_score(self.y_true, self.y_pred),
}
def residual_analysis(self) -> dict:
"""Analyze residual patterns."""
return {
'mean_residual': np.mean(self.residuals),
'std_residual': np.std(self.residuals),
'max_overestimate': np.min(self.residuals),
'max_underestimate': np.max(self.residuals),
'residual_skewness': self._skewness(self.residuals),
}
def _skewness(self, data):
"""Calculate skewness."""
n = len(data)
mean = np.mean(data)
std = np.std(data)
return (n / ((n-1) * (n-2))) * np.sum(((data - mean) / std) ** 3)
def plot_diagnostics(self, save_path=None):
"""Plot diagnostic plots for residual analysis."""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Actual vs Predicted
ax1 = axes[0, 0]
ax1.scatter(self.y_true, self.y_pred, alpha=0.5)
ax1.plot([self.y_true.min(), self.y_true.max()],
[self.y_true.min(), self.y_true.max()], 'r--')
ax1.set_xlabel('Actual')
ax1.set_ylabel('Predicted')
ax1.set_title('Actual vs Predicted')
# Residuals vs Predicted
ax2 = axes[0, 1]
ax2.scatter(self.y_pred, self.residuals, alpha=0.5)
ax2.axhline(y=0, color='r', linestyle='--')
ax2.set_xlabel('Predicted')
ax2.set_ylabel('Residuals')
ax2.set_title('Residuals vs Predicted')
# Residual histogram
ax3 = axes[1, 0]
ax3.hist(self.residuals, bins=30, edgecolor='black')
ax3.set_xlabel('Residual')
ax3.set_ylabel('Frequency')
ax3.set_title('Residual Distribution')
# Q-Q plot
ax4 = axes[1, 1]
from scipy import stats
stats.probplot(self.residuals, dist="norm", plot=ax4)
ax4.set_title('Q-Q Plot')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.show()from sklearn.model_selection import (
cross_val_score, StratifiedKFold, TimeSeriesSplit,
GroupKFold, cross_validate
)
def evaluate_with_cv(model, X, y, cv_strategy='stratified', n_splits=5, groups=None):
"""
Evaluate model with appropriate cross-validation strategy.
Args:
model: Sklearn-compatible model
X: Features
y: Target
cv_strategy: 'stratified', 'timeseries', 'group', or 'kfold'
n_splits: Number of CV folds
groups: Group labels for GroupKFold
Returns:
Dictionary with CV results
"""
# Select CV strategy
if cv_strategy == 'stratified':
cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
elif cv_strategy == 'timeseries':
cv = TimeSeriesSplit(n_splits=n_splits)
elif cv_strategy == 'group':
cv = GroupKFold(n_splits=n_splits)
else:
cv = n_splits
# Define scoring metrics
scoring = {
'accuracy': 'accuracy',
'precision': 'precision_weighted',
'recall': 'recall_weighted',
'f1': 'f1_weighted',
'roc_auc': 'roc_auc'
}
# Perform cross-validation
cv_results = cross_validate(
model, X, y,
cv=cv,
scoring=scoring,
groups=groups,
return_train_score=True,
n_jobs=-1
)
# Summarize results
summary = {}
for metric in scoring.keys():
test_scores = cv_results[f'test_{metric}']
train_scores = cv_results[f'train_{metric}']
summary[metric] = {
'test_mean': np.mean(test_scores),
'test_std': np.std(test_scores),
'train_mean': np.mean(train_scores),
'train_std': np.std(train_scores),
'overfit_gap': np.mean(train_scores) - np.mean(test_scores)
}
return summary
# Usage example
results = evaluate_with_cv(model, X, y, cv_strategy='stratified', n_splits=5)
for metric, values in results.items():
print(f"{metric}: {values['test_mean']:.4f} (+/- {values['test_std']:.4f})")
print(f" Overfitting gap: {values['overfit_gap']:.4f}")def evaluate_fairness(y_true, y_pred, sensitive_attr, favorable_label=1):
"""
Evaluate model fairness across demographic groups.
Args:
y_true: True labels
y_pred: Predicted labels
sensitive_attr: Protected attribute values
favorable_label: The favorable outcome label
Returns:
Dictionary with fairness metrics
"""
groups = np.unique(sensitive_attr)
results = {'group_metrics': {}}
for group in groups:
mask = sensitive_attr == group
group_true = y_true[mask]
group_pred = y_pred[mask]
# Calculate group-specific metrics
tp = np.sum((group_true == favorable_label) & (group_pred == favorable_label))
fp = np.sum((group_true != favorable_label) & (group_pred == favorable_label))
fn = np.sum((group_true == favorable_label) & (group_pred != favorable_label))
tn = np.sum((group_true != favorable_label) & (group_pred != favorable_label))
results['group_metrics'][group] = {
'selection_rate': np.mean(group_pred == favorable_label),
'tpr': tp / (tp + fn) if (tp + fn) > 0 else 0,
'fpr': fp / (fp + tn) if (fp + tn) > 0 else 0,
'accuracy': np.mean(group_true == group_pred),
'size': len(group_true)
}
# Calculate fairness metrics
selection_rates = [m['selection_rate'] for m in results['group_metrics'].values()]
tprs = [m['tpr'] for m in results['group_metrics'].values()]
fprs = [m['fpr'] for m in results['group_metrics'].values()]
results['fairness_metrics'] = {
'demographic_parity_diff': max(selection_rates) - min(selection_rates),
'equalized_odds_tpr_diff': max(tprs) - min(tprs),
'equalized_odds_fpr_diff': max(fprs) - min(fprs),
}
return resultsimport matplotlib.pyplot as plt
from sklearn.model_selection import learning_curve
def plot_learning_curves(model, X, y, cv=5, train_sizes=np.linspace(0.1, 1.0, 10)):
"""
Plot learning curves to diagnose overfitting/underfitting.
Args:
model: Sklearn-compatible model
X: Features
y: Target
cv: Cross-validation folds
train_sizes: Array of training set size fractions
"""
train_sizes, train_scores, val_scores = learning_curve(
model, X, y,
cv=cv,
train_sizes=train_sizes,
scoring='accuracy',
n_jobs=-1
)
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
val_mean = np.mean(val_scores, axis=1)
val_std = np.std(val_scores, axis=1)
plt.figure(figsize=(10, 6))
plt.plot(train_sizes, train_mean, label='Training score', color='blue', marker='o')
plt.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.15, color='blue')
plt.plot(train_sizes, val_mean, label='Validation score', color='red', marker='o')
plt.fill_between(train_sizes, val_mean - val_std, val_mean + val_std, alpha=0.15, color='red')
plt.xlabel('Training Set Size')
plt.ylabel('Accuracy')
plt.title('Learning Curves')
plt.legend(loc='lower right')
plt.grid(True, alpha=0.3)
# Add diagnostic annotations
final_gap = train_mean[-1] - val_mean[-1]
if final_gap > 0.1:
plt.text(0.5, 0.05, 'HIGH OVERFITTING: Large gap between train and validation',
transform=plt.gca().transAxes, color='red', fontweight='bold')
elif val_mean[-1] < 0.7:
plt.text(0.5, 0.05, 'UNDERFITTING: Both train and validation scores are low',
transform=plt.gca().transAxes, color='orange', fontweight='bold')
plt.tight_layout()
plt.show()
return {
'final_train_score': train_mean[-1],
'final_val_score': val_mean[-1],
'overfit_gap': final_gap
}from typing import List, Dict
import openai
def evaluate_llm_generation(
prompts: List[str],
references: List[str],
model: str,
judge_model: str = "gpt-4"
) -> Dict:
"""
Evaluate LLM generation quality using LLM-as-judge.
Args:
prompts: Input prompts
references: Reference outputs (if available)
model: Model to evaluate
judge_model: Model to use as judge
Returns:
Dictionary with evaluation scores
"""
results = []
for prompt, reference in zip(prompts, references):
# Generate response
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
generation = response.choices[0].message.content
# LLM-as-judge evaluation
judge_prompt = f"""Evaluate the following AI-generated response on a scale of 1-5 for:
1. Accuracy: Is the information correct?
2. Relevance: Does it address the prompt?
3. Fluency: Is it well-written and coherent?
4. Helpfulness: Is it useful to the user?
Prompt: {prompt}
Reference (if available): {reference}
Response: {generation}
Provide scores in JSON format: {{"accuracy": X, "relevance": X, "fluency": X, "helpfulness": X, "overall": X}}
"""
judge_response = openai.ChatCompletion.create(
model=judge_model,
messages=[{"role": "user", "content": judge_prompt}]
)
scores = json.loads(judge_response.choices[0].message.content)
results.append({
'prompt': prompt,
'generation': generation,
'scores': scores
})
# Aggregate scores
avg_scores = {}
for key in ['accuracy', 'relevance', 'fluency', 'helpfulness', 'overall']:
avg_scores[key] = np.mean([r['scores'][key] for r in results])
return {
'individual_results': results,
'average_scores': avg_scores
}from scipy import stats
def analyze_ab_test(control_metric: np.ndarray, treatment_metric: np.ndarray, alpha: float = 0.05):
"""
Analyze A/B test results with statistical significance testing.
Args:
control_metric: Metric values for control group
treatment_metric: Metric values for treatment group
alpha: Significance level
Returns:
Dictionary with test results
"""
# Descriptive statistics
control_mean = np.mean(control_metric)
treatment_mean = np.mean(treatment_metric)
relative_lift = (treatment_mean - control_mean) / control_mean * 100
# Statistical test
t_stat, p_value = stats.ttest_ind(treatment_metric, control_metric)
is_significant = p_value < alpha
# Effect size (Cohen's d)
pooled_std = np.sqrt((np.std(control_metric)**2 + np.std(treatment_metric)**2) / 2)
cohens_d = (treatment_mean - control_mean) / pooled_std
# Confidence interval
ci = stats.t.interval(
confidence=1-alpha,
df=len(control_metric) + len(treatment_metric) - 2,
loc=treatment_mean - control_mean,
scale=stats.sem(np.concatenate([control_metric, treatment_metric]))
)
return {
'control_mean': control_mean,
'treatment_mean': treatment_mean,
'relative_lift_pct': relative_lift,
'p_value': p_value,
'is_significant': is_significant,
'cohens_d': cohens_d,
'confidence_interval': ci,
'recommendation': 'LAUNCH' if is_significant and relative_lift > 0 else 'DO NOT LAUNCH'
}
# Usage
results = analyze_ab_test(control_conversions, treatment_conversions)
print(f"Relative Lift: {results['relative_lift_pct']:.2f}%")
print(f"P-value: {results['p_value']:.4f}")
print(f"Recommendation: {results['recommendation']}")from scipy.stats import ks_2samp
import pandas as pd
class ModelMonitor:
"""Monitor deployed model for drift and degradation."""
def __init__(self, baseline_data: pd.DataFrame, baseline_predictions: np.ndarray):
self.baseline_data = baseline_data
self.baseline_predictions = baseline_predictions
def detect_data_drift(self, current_data: pd.DataFrame, threshold: float = 0.05) -> Dict:
"""Detect feature distribution drift using KS test."""
drift_results = {}
for col in self.baseline_data.columns:
if pd.api.types.is_numeric_dtype(self.baseline_data[col]):
statistic, p_value = ks_2samp(
self.baseline_data[col].dropna(),
current_data[col].dropna()
)
drift_results[col] = {
'ks_statistic': statistic,
'p_value': p_value,
'drift_detected': p_value < threshold
}
return drift_results
def detect_prediction_drift(self, current_predictions: np.ndarray, threshold: float = 0.05) -> Dict:
"""Detect prediction distribution drift."""
statistic, p_value = ks_2samp(self.baseline_predictions, current_predictions)
return {
'ks_statistic': statistic,
'p_value': p_value,
'drift_detected': p_value < threshold,
'baseline_mean': np.mean(self.baseline_predictions),
'current_mean': np.mean(current_predictions),
'mean_shift': np.mean(current_predictions) - np.mean(self.baseline_predictions)
}
def performance_degradation_check(
self,
current_metric: float,
baseline_metric: float,
threshold_pct: float = 5.0
) -> Dict:
"""Check for performance degradation."""
degradation_pct = (baseline_metric - current_metric) / baseline_metric * 100
return {
'baseline_metric': baseline_metric,
'current_metric': current_metric,
'degradation_pct': degradation_pct,
'alert': degradation_pct > threshold_pct,
'recommendation': 'RETRAIN MODEL' if degradation_pct > threshold_pct else 'OK'
}
# Usage
monitor = ModelMonitor(baseline_df, baseline_preds)
drift_check = monitor.detect_data_drift(current_df)
pred_drift = monitor.detect_prediction_drift(current_preds)
perf_check = monitor.performance_degradation_check(current_accuracy, baseline_accuracy)