Loading...
Loading...
Market regime identification using volatility clustering, trend detection, and statistical methods for adaptive trading
npx skill4agent add agiprolabs/claude-trading-skills regime-detection| Low Volatility | High Volatility | |
|---|---|---|
| Trending | Q1: Clean trend — best for trend following | Q2: Volatile trend — momentum with caution |
| Ranging | Q3: Quiet range — mean-reversion paradise | Q4: Choppy chaos — reduce or sit out |
import pandas as pd
import numpy as np
def atr_percentile(
high: pd.Series, low: pd.Series, close: pd.Series,
atr_period: int = 14, lookback: int = 100
) -> pd.Series:
"""ATR percentile rank over a rolling window."""
tr = pd.concat([
high - low,
(high - close.shift(1)).abs(),
(low - close.shift(1)).abs()
], axis=1).max(axis=1)
atr = tr.rolling(atr_period).mean()
return atr.rolling(lookback).apply(
lambda x: pd.Series(x).rank(pct=True).iloc[-1], raw=False
)def compute_adx(
high: pd.Series, low: pd.Series, close: pd.Series,
period: int = 14
) -> pd.Series:
"""Average Directional Index."""
plus_dm = high.diff().clip(lower=0)
minus_dm = (-low.diff()).clip(lower=0)
# Zero out when the other is larger
plus_dm[plus_dm < minus_dm] = 0
minus_dm[minus_dm < plus_dm] = 0
tr = pd.concat([
high - low,
(high - close.shift(1)).abs(),
(low - close.shift(1)).abs()
], axis=1).max(axis=1)
atr = tr.ewm(span=period, adjust=False).mean()
plus_di = 100 * plus_dm.ewm(span=period, adjust=False).mean() / atr
minus_di = 100 * minus_dm.ewm(span=period, adjust=False).mean() / atr
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di)
return dx.ewm(span=period, adjust=False).mean()def trend_direction(close: pd.Series, period: int = 20) -> pd.Series:
"""Returns +1 (uptrend), -1 (downtrend), 0 (neutral)."""
ema = close.ewm(span=period, adjust=False).mean()
slope = ema.diff(5) # 5-bar slope
above = (close > ema).astype(int)
direction = pd.Series(0, index=close.index)
direction[(slope > 0) & (above == 1)] = 1
direction[(slope < 0) & (above == 0)] = -1
return directiondef bb_width_percentile(
close: pd.Series, period: int = 20,
std_dev: float = 2.0, lookback: int = 100
) -> pd.Series:
"""Bollinger Band width percentile."""
sma = close.rolling(period).mean()
std = close.rolling(period).std()
width = (2 * std_dev * std) / sma
return width.rolling(lookback).apply(
lambda x: pd.Series(x).rank(pct=True).iloc[-1], raw=False
)references/methodology.mddef hurst_exponent(series: pd.Series, max_lag: int = 50) -> float:
"""Estimate Hurst exponent using R/S method."""
lags = range(2, max_lag)
rs_values = []
for lag in lags:
chunks = [series.iloc[i:i+lag] for i in range(0, len(series) - lag, lag)]
rs_list = []
for chunk in chunks:
if len(chunk) < lag:
continue
mean_val = chunk.mean()
devs = chunk - mean_val
cumdev = devs.cumsum()
r = cumdev.max() - cumdev.min()
s = chunk.std(ddof=1)
if s > 0:
rs_list.append(r / s)
if rs_list:
rs_values.append(np.mean(rs_list))
else:
rs_values.append(np.nan)
valid = [(l, r) for l, r in zip(lags, rs_values) if not np.isnan(r)]
if len(valid) < 5:
return 0.5
log_lags = np.log([v[0] for v in valid])
log_rs = np.log([v[1] for v in valid])
coeffs = np.polyfit(log_lags, log_rs, 1)
return coeffs[0]def cusum_test(
returns: pd.Series, threshold: float = 2.0
) -> list[int]:
"""CUSUM change-point detection on returns.
Returns indices where regime changes are detected.
"""
mean_r = returns.mean()
std_r = returns.std()
if std_r == 0:
return []
s_pos, s_neg = 0.0, 0.0
changes = []
for i, r in enumerate(returns):
z = (r - mean_r) / std_r
s_pos = max(0, s_pos + z - 0.5)
s_neg = max(0, s_neg - z - 0.5)
if s_pos > threshold or s_neg > threshold:
changes.append(i)
s_pos, s_neg = 0.0, 0.0
return changeshmmlearn# Optional: requires `uv pip install hmmlearn`
from hmmlearn import hmm
def fit_hmm_regimes(
returns: np.ndarray, n_states: int = 2, n_iter: int = 100
) -> tuple[np.ndarray, object]:
"""Fit a Gaussian HMM to return series."""
X = returns.reshape(-1, 1)
model = hmm.GaussianHMM(
n_components=n_states, covariance_type="full", n_iter=n_iter
)
model.fit(X)
states = model.predict(X)
return states, modelreferences/methodology.md| Parameter | Equities | Crypto (large cap) | Crypto (micro cap / PumpFun) |
|---|---|---|---|
| ATR lookback | 100–200 bars | 50–100 bars | 20–50 bars |
| ADX period | 14–28 | 10–14 | 7–10 |
| Regime persistence | Weeks–months | Days–weeks | Hours–days |
| Hurst window | 200+ bars | 100 bars | 50 bars |
def classify_regime(
vol_percentile: float, adx: float, hurst: float,
trend_dir: int
) -> dict[str, str]:
"""Classify into the 4-quadrant model."""
vol_regime = (
"low" if vol_percentile < 0.30
else "high" if vol_percentile > 0.70
else "normal"
)
trend_regime = (
"trending" if adx > 25
else "ranging" if adx < 20
else "transitional"
)
direction = (
"up" if trend_dir > 0
else "down" if trend_dir < 0
else "neutral"
)
mr_regime = (
"mean_reverting" if hurst < 0.4
else "trending" if hurst > 0.6
else "random"
)
return {
"volatility": vol_regime,
"trend": trend_regime,
"direction": direction,
"mean_reversion": mr_regime,
"quadrant": f"{vol_regime}_vol_{trend_regime}",
}references/strategy_adaptation.md| Current Regime | Action |
|---|---|
| Low vol + trending up | Full size trend-following, tight stops |
| High vol + trending | Half size momentum, wide stops |
| Low vol + ranging | Mean-reversion / grid strategies |
| High vol + ranging | Reduce to 25% size or sit out |
| Regime transition | Flatten or reduce to minimum size |
pandas-tavolatility-modelingstrategy-frameworkposition-sizingrisk-managementreferences/methodology.mdreferences/strategy_adaptation.mdscripts/detect_regime.pyscripts/regime_backtest.py