time-series

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Time Series Analysis & Forecasting

时间序列分析与预测

Analyze temporal data patterns and build accurate forecasting models.
分析时间数据模式并构建精准的预测模型。

Quick Start

快速开始

Data Preparation

数据准备

python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

Load time series data

Load time series data

df = pd.read_csv('sales.csv', parse_dates=['date'], index_col='date')
df = pd.read_csv('sales.csv', parse_dates=['date'], index_col='date')

Ensure proper datetime index

Ensure proper datetime index

df.index = pd.DatetimeIndex(df.index, freq='D')
df.index = pd.DatetimeIndex(df.index, freq='D')

Basic exploration

Basic exploration

print(df.head()) print(df.describe())
print(df.head()) print(df.describe())

Visualize

Visualize

df['value'].plot(figsize=(12, 4), title='Time Series') plt.show()
undefined
df['value'].plot(figsize=(12, 4), title='Time Series') plt.show()
undefined

Decomposition

分解分析

python
from statsmodels.tsa.seasonal import seasonal_decompose
python
from statsmodels.tsa.seasonal import seasonal_decompose

Additive decomposition

Additive decomposition

decomposition = seasonal_decompose(df['value'], model='additive', period=7)
fig, axes = plt.subplots(4, 1, figsize=(12, 10)) decomposition.observed.plot(ax=axes[0], title='Original') decomposition.trend.plot(ax=axes[1], title='Trend') decomposition.seasonal.plot(ax=axes[2], title='Seasonality') decomposition.resid.plot(ax=axes[3], title='Residual') plt.tight_layout() plt.show()
undefined
decomposition = seasonal_decompose(df['value'], model='additive', period=7)
fig, axes = plt.subplots(4, 1, figsize=(12, 10)) decomposition.observed.plot(ax=axes[0], title='Original') decomposition.trend.plot(ax=axes[1], title='Trend') decomposition.seasonal.plot(ax=axes[2], title='Seasonality') decomposition.resid.plot(ax=axes[3], title='Residual') plt.tight_layout() plt.show()
undefined

Statistical Tests

统计检验

Stationarity Tests

平稳性检验

python
from statsmodels.tsa.stattools import adfuller, kpss

def check_stationarity(series):
    """Test for stationarity using ADF and KPSS tests"""

    # ADF Test (H0: series has unit root, i.e., non-stationary)
    adf_result = adfuller(series, autolag='AIC')
    adf_pvalue = adf_result[1]

    # KPSS Test (H0: series is stationary)
    kpss_result = kpss(series, regression='c')
    kpss_pvalue = kpss_result[1]

    print("Stationarity Test Results:")
    print(f"ADF Statistic: {adf_result[0]:.4f}")
    print(f"ADF p-value: {adf_pvalue:.4f}")
    print(f"KPSS Statistic: {kpss_result[0]:.4f}")
    print(f"KPSS p-value: {kpss_pvalue:.4f}")

    if adf_pvalue < 0.05 and kpss_pvalue > 0.05:
        print("Conclusion: Series is STATIONARY")
        return True
    elif adf_pvalue > 0.05 and kpss_pvalue < 0.05:
        print("Conclusion: Series is NON-STATIONARY")
        return False
    else:
        print("Conclusion: Inconclusive - consider differencing")
        return None

is_stationary = check_stationarity(df['value'])
python
from statsmodels.tsa.stattools import adfuller, kpss

def check_stationarity(series):
    """Test for stationarity using ADF and KPSS tests"""

    # ADF Test (H0: series has unit root, i.e., non-stationary)
    adf_result = adfuller(series, autolag='AIC')
    adf_pvalue = adf_result[1]

    # KPSS Test (H0: series is stationary)
    kpss_result = kpss(series, regression='c')
    kpss_pvalue = kpss_result[1]

    print("Stationarity Test Results:")
    print(f"ADF Statistic: {adf_result[0]:.4f}")
    print(f"ADF p-value: {adf_pvalue:.4f}")
    print(f"KPSS Statistic: {kpss_result[0]:.4f}")
    print(f"KPSS p-value: {kpss_pvalue:.4f}")

    if adf_pvalue < 0.05 and kpss_pvalue > 0.05:
        print("Conclusion: Series is STATIONARY")
        return True
    elif adf_pvalue > 0.05 and kpss_pvalue < 0.05:
        print("Conclusion: Series is NON-STATIONARY")
        return False
    else:
        print("Conclusion: Inconclusive - consider differencing")
        return None

is_stationary = check_stationarity(df['value'])

Making Series Stationary

使序列平稳

python
def make_stationary(series, max_diff=2):
    """Apply differencing to make series stationary"""
    diff_series = series.copy()
    d = 0

    while d < max_diff:
        if check_stationarity(diff_series.dropna()):
            break
        diff_series = diff_series.diff()
        d += 1

    return diff_series.dropna(), d

stationary_series, d_order = make_stationary(df['value'])
print(f"Differencing order: {d_order}")
python
def make_stationary(series, max_diff=2):
    """Apply differencing to make series stationary"""
    diff_series = series.copy()
    d = 0

    while d < max_diff:
        if check_stationarity(diff_series.dropna()):
            break
        diff_series = diff_series.diff()
        d += 1

    return diff_series.dropna(), d

stationary_series, d_order = make_stationary(df['value'])
print(f"Differencing order: {d_order}")

ARIMA / SARIMA

ARIMA / SARIMA

ACF/PACF Analysis

ACF/PACF分析

python
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

fig, axes = plt.subplots(1, 2, figsize=(14, 4))

plot_acf(stationary_series, lags=40, ax=axes[0])
axes[0].set_title('Autocorrelation (ACF)')

plot_pacf(stationary_series, lags=40, ax=axes[1])
axes[1].set_title('Partial Autocorrelation (PACF)')

plt.tight_layout()
plt.show()
python
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

fig, axes = plt.subplots(1, 2, figsize=(14, 4))

plot_acf(stationary_series, lags=40, ax=axes[0])
axes[0].set_title('Autocorrelation (ACF)')

plot_pacf(stationary_series, lags=40, ax=axes[1])
axes[1].set_title('Partial Autocorrelation (PACF)')

plt.tight_layout()
plt.show()

Reading ACF/PACF:

Reading ACF/PACF:

- PACF cuts off after lag p -> AR(p)

- PACF cuts off after lag p -> AR(p)

- ACF cuts off after lag q -> MA(q)

- ACF cuts off after lag q -> MA(q)

- Both decay -> ARMA(p, q)

- Both decay -> ARMA(p, q)

undefined
undefined

Auto ARIMA

自动ARIMA

python
from pmdarima import auto_arima
python
from pmdarima import auto_arima

Automatic order selection

Automatic order selection

auto_model = auto_arima( df['value'], start_p=0, max_p=5, start_q=0, max_q=5, d=None, # Auto-detect differencing seasonal=True, m=7, # Weekly seasonality start_P=0, max_P=2, start_Q=0, max_Q=2, D=None, trace=True, error_action='ignore', suppress_warnings=True, stepwise=True, information_criterion='aic' )
print(auto_model.summary()) print(f"Best order: {auto_model.order}") print(f"Seasonal order: {auto_model.seasonal_order}")
undefined
auto_model = auto_arima( df['value'], start_p=0, max_p=5, start_q=0, max_q=5, d=None, # Auto-detect differencing seasonal=True, m=7, # Weekly seasonality start_P=0, max_P=2, start_Q=0, max_Q=2, D=None, trace=True, error_action='ignore', suppress_warnings=True, stepwise=True, information_criterion='aic' )
print(auto_model.summary()) print(f"Best order: {auto_model.order}") print(f"Seasonal order: {auto_model.seasonal_order}")
undefined

Manual SARIMA

手动SARIMA

python
from statsmodels.tsa.statespace.sarimax import SARIMAX
python
from statsmodels.tsa.statespace.sarimax import SARIMAX

Train/test split

Train/test split

train = df['value'][:-30] test = df['value'][-30:]
train = df['value'][:-30] test = df['value'][-30:]

Fit SARIMA model

Fit SARIMA model

model = SARIMAX( train, order=(1, 1, 1), # (p, d, q) seasonal_order=(1, 1, 1, 7), # (P, D, Q, m) enforce_stationarity=False, enforce_invertibility=False )
results = model.fit(disp=False) print(results.summary())
model = SARIMAX( train, order=(1, 1, 1), # (p, d, q) seasonal_order=(1, 1, 1, 7), # (P, D, Q, m) enforce_stationarity=False, enforce_invertibility=False )
results = model.fit(disp=False) print(results.summary())

Diagnostics

Diagnostics

results.plot_diagnostics(figsize=(12, 8)) plt.tight_layout() plt.show()
results.plot_diagnostics(figsize=(12, 8)) plt.tight_layout() plt.show()

Forecast

Forecast

forecast = results.get_forecast(steps=30) forecast_mean = forecast.predicted_mean forecast_ci = forecast.conf_int()
forecast = results.get_forecast(steps=30) forecast_mean = forecast.predicted_mean forecast_ci = forecast.conf_int()

Plot

Plot

plt.figure(figsize=(12, 5)) plt.plot(train.index, train, label='Training') plt.plot(test.index, test, label='Actual') plt.plot(test.index, forecast_mean, label='Forecast', color='red') plt.fill_between( test.index, forecast_ci.iloc[:, 0], forecast_ci.iloc[:, 1], alpha=0.3 ) plt.legend() plt.title('SARIMA Forecast') plt.show()
undefined
plt.figure(figsize=(12, 5)) plt.plot(train.index, train, label='Training') plt.plot(test.index, test, label='Actual') plt.plot(test.index, forecast_mean, label='Forecast', color='red') plt.fill_between( test.index, forecast_ci.iloc[:, 0], forecast_ci.iloc[:, 1], alpha=0.3 ) plt.legend() plt.title('SARIMA Forecast') plt.show()
undefined

Prophet

Prophet

python
from prophet import Prophet
python
from prophet import Prophet

Prepare data (Prophet requires 'ds' and 'y' columns)

Prepare data (Prophet requires 'ds' and 'y' columns)

prophet_df = df.reset_index() prophet_df.columns = ['ds', 'y']
prophet_df = df.reset_index() prophet_df.columns = ['ds', 'y']

Initialize and fit

Initialize and fit

model = Prophet( yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False, seasonality_mode='multiplicative', changepoint_prior_scale=0.05, seasonality_prior_scale=10 )
model = Prophet( yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False, seasonality_mode='multiplicative', changepoint_prior_scale=0.05, seasonality_prior_scale=10 )

Add custom seasonality

Add custom seasonality

model.add_seasonality(name='monthly', period=30.5, fourier_order=5)
model.add_seasonality(name='monthly', period=30.5, fourier_order=5)

Add holidays (optional)

Add holidays (optional)

model.add_country_holidays(country_name='US')
model.fit(prophet_df)
model.add_country_holidays(country_name='US')
model.fit(prophet_df)

Create future dataframe

Create future dataframe

future = model.make_future_dataframe(periods=30)
future = model.make_future_dataframe(periods=30)

Predict

Predict

forecast = model.predict(future)
forecast = model.predict(future)

Visualize

Visualize

fig1 = model.plot(forecast) plt.title('Prophet Forecast') plt.show()
fig1 = model.plot(forecast) plt.title('Prophet Forecast') plt.show()

Components

Components

fig2 = model.plot_components(forecast) plt.show()
fig2 = model.plot_components(forecast) plt.show()

Cross-validation

Cross-validation

from prophet.diagnostics import cross_validation, performance_metrics
df_cv = cross_validation( model, initial='365 days', period='30 days', horizon='30 days' ) df_metrics = performance_metrics(df_cv) print(df_metrics[['horizon', 'mape', 'rmse', 'mae']])
undefined
from prophet.diagnostics import cross_validation, performance_metrics
df_cv = cross_validation( model, initial='365 days', period='30 days', horizon='30 days' ) df_metrics = performance_metrics(df_cv) print(df_metrics[['horizon', 'mape', 'rmse', 'mae']])
undefined

Exponential Smoothing

指数平滑法

python
from statsmodels.tsa.holtwinters import ExponentialSmoothing
python
from statsmodels.tsa.holtwinters import ExponentialSmoothing

Simple Exponential Smoothing

Simple Exponential Smoothing

from statsmodels.tsa.api import SimpleExpSmoothing
ses_model = SimpleExpSmoothing(train).fit( smoothing_level=0.3, optimized=True )
from statsmodels.tsa.api import SimpleExpSmoothing
ses_model = SimpleExpSmoothing(train).fit( smoothing_level=0.3, optimized=True )

Holt-Winters (Triple Exponential Smoothing)

Holt-Winters (Triple Exponential Smoothing)

hw_model = ExponentialSmoothing( train, trend='add', # 'add', 'mul', or None seasonal='add', # 'add', 'mul', or None seasonal_periods=7 ).fit( smoothing_level=0.3, smoothing_trend=0.1, smoothing_seasonal=0.1, optimized=True )
hw_model = ExponentialSmoothing( train, trend='add', # 'add', 'mul', or None seasonal='add', # 'add', 'mul', or None seasonal_periods=7 ).fit( smoothing_level=0.3, smoothing_trend=0.1, smoothing_seasonal=0.1, optimized=True )

Forecast

Forecast

hw_forecast = hw_model.forecast(steps=30)
hw_forecast = hw_model.forecast(steps=30)

Compare

Compare

plt.figure(figsize=(12, 5)) plt.plot(train, label='Training') plt.plot(test, label='Test') plt.plot(test.index, hw_forecast, label='Holt-Winters', color='red') plt.legend() plt.show()
undefined
plt.figure(figsize=(12, 5)) plt.plot(train, label='Training') plt.plot(test, label='Test') plt.plot(test.index, hw_forecast, label='Holt-Winters', color='red') plt.legend() plt.show()
undefined

Deep Learning for Time Series

时间序列深度学习

LSTM

LSTM

python
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

class LSTMForecaster(nn.Module):
    """LSTM for time series forecasting"""

    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers,
            batch_first=True,
            dropout=0.2
        )
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


def create_sequences(data, seq_length):
    """Create sequences for LSTM"""
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(X), np.array(y)
python
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

class LSTMForecaster(nn.Module):
    """LSTM for time series forecasting"""

    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers,
            batch_first=True,
            dropout=0.2
        )
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


def create_sequences(data, seq_length):
    """Create sequences for LSTM"""
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(X), np.array(y)

Prepare data

Prepare data

from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler() scaled_data = scaler.fit_transform(df['value'].values.reshape(-1, 1))
seq_length = 30 X, y = create_sequences(scaled_data, seq_length)
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler() scaled_data = scaler.fit_transform(df['value'].values.reshape(-1, 1))
seq_length = 30 X, y = create_sequences(scaled_data, seq_length)

Convert to tensors

Convert to tensors

X_tensor = torch.FloatTensor(X) y_tensor = torch.FloatTensor(y)
X_tensor = torch.FloatTensor(X) y_tensor = torch.FloatTensor(y)

Train/test split

Train/test split

train_size = int(0.8 * len(X)) X_train, X_test = X_tensor[:train_size], X_tensor[train_size:] y_train, y_test = y_tensor[:train_size], y_tensor[train_size:]
train_size = int(0.8 * len(X)) X_train, X_test = X_tensor[:train_size], X_tensor[train_size:] y_train, y_test = y_tensor[:train_size], y_tensor[train_size:]

Training

Training

model = LSTMForecaster() criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100): model.train() optimizer.zero_grad() output = model(X_train) loss = criterion(output, y_train) loss.backward() optimizer.step()
if epoch % 10 == 0:
    print(f'Epoch {epoch}, Loss: {loss.item():.4f}')
undefined
model = LSTMForecaster() criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100): model.train() optimizer.zero_grad() output = model(X_train) loss = criterion(output, y_train) loss.backward() optimizer.step()
if epoch % 10 == 0:
    print(f'Epoch {epoch}, Loss: {loss.item():.4f}')
undefined

Transformer for Time Series

时间序列Transformer

python
class TimeSeriesTransformer(nn.Module):
    """Transformer for time series forecasting"""

    def __init__(self, input_size=1, d_model=64, nhead=4,
                 num_layers=2, output_size=1, seq_length=30):
        super().__init__()

        self.embedding = nn.Linear(input_size, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(1, seq_length, d_model))

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=256,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)

        self.fc = nn.Linear(d_model, output_size)

    def forward(self, x):
        x = self.embedding(x) + self.pos_encoding
        x = self.transformer(x)
        x = self.fc(x[:, -1, :])
        return x
python
class TimeSeriesTransformer(nn.Module):
    """Transformer for time series forecasting"""

    def __init__(self, input_size=1, d_model=64, nhead=4,
                 num_layers=2, output_size=1, seq_length=30):
        super().__init__()

        self.embedding = nn.Linear(input_size, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(1, seq_length, d_model))

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=256,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)

        self.fc = nn.Linear(d_model, output_size)

    def forward(self, x):
        x = self.embedding(x) + self.pos_encoding
        x = self.transformer(x)
        x = self.fc(x[:, -1, :])
        return x

Anomaly Detection

异常检测

python
from sklearn.ensemble import IsolationForest
from scipy import stats

def detect_anomalies(series, method='zscore', threshold=3):
    """Detect anomalies using various methods"""

    if method == 'zscore':
        z_scores = np.abs(stats.zscore(series))
        anomalies = z_scores > threshold

    elif method == 'iqr':
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        anomalies = (series < Q1 - 1.5 * IQR) | (series > Q3 + 1.5 * IQR)

    elif method == 'isolation_forest':
        model = IsolationForest(contamination=0.05, random_state=42)
        predictions = model.fit_predict(series.values.reshape(-1, 1))
        anomalies = predictions == -1

    elif method == 'rolling':
        rolling_mean = series.rolling(window=7).mean()
        rolling_std = series.rolling(window=7).std()
        upper = rolling_mean + threshold * rolling_std
        lower = rolling_mean - threshold * rolling_std
        anomalies = (series > upper) | (series < lower)

    return anomalies
python
from sklearn.ensemble import IsolationForest
from scipy import stats

def detect_anomalies(series, method='zscore', threshold=3):
    """Detect anomalies using various methods"""

    if method == 'zscore':
        z_scores = np.abs(stats.zscore(series))
        anomalies = z_scores > threshold

    elif method == 'iqr':
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        anomalies = (series < Q1 - 1.5 * IQR) | (series > Q3 + 1.5 * IQR)

    elif method == 'isolation_forest':
        model = IsolationForest(contamination=0.05, random_state=42)
        predictions = model.fit_predict(series.values.reshape(-1, 1))
        anomalies = predictions == -1

    elif method == 'rolling':
        rolling_mean = series.rolling(window=7).mean()
        rolling_std = series.rolling(window=7).std()
        upper = rolling_mean + threshold * rolling_std
        lower = rolling_mean - threshold * rolling_std
        anomalies = (series > upper) | (series < lower)

    return anomalies

Detect and visualize

Detect and visualize

anomalies = detect_anomalies(df['value'], method='zscore')
plt.figure(figsize=(12, 5)) plt.plot(df.index, df['value'], label='Value') plt.scatter(df.index[anomalies], df['value'][anomalies], color='red', label='Anomalies') plt.legend() plt.title('Anomaly Detection') plt.show()
undefined
anomalies = detect_anomalies(df['value'], method='zscore')
plt.figure(figsize=(12, 5)) plt.plot(df.index, df['value'], label='Value') plt.scatter(df.index[anomalies], df['value'][anomalies], color='red', label='Anomalies') plt.legend() plt.title('Anomaly Detection') plt.show()
undefined

Feature Engineering

特征工程

python
def create_time_features(df):
    """Create time-based features"""
    df = df.copy()

    # Calendar features
    df['hour'] = df.index.hour
    df['dayofweek'] = df.index.dayofweek
    df['dayofmonth'] = df.index.day
    df['month'] = df.index.month
    df['quarter'] = df.index.quarter
    df['year'] = df.index.year
    df['dayofyear'] = df.index.dayofyear
    df['weekofyear'] = df.index.isocalendar().week

    # Cyclical encoding (for periodic features)
    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    df['dayofweek_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
    df['dayofweek_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)

    # Boolean features
    df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
    df['is_month_start'] = df.index.is_month_start.astype(int)
    df['is_month_end'] = df.index.is_month_end.astype(int)

    return df


def create_lag_features(df, target_col, lags):
    """Create lag features"""
    df = df.copy()

    for lag in lags:
        df[f'{target_col}_lag_{lag}'] = df[target_col].shift(lag)

    return df


def create_rolling_features(df, target_col, windows):
    """Create rolling window features"""
    df = df.copy()

    for window in windows:
        df[f'{target_col}_rolling_mean_{window}'] = df[target_col].rolling(window).mean()
        df[f'{target_col}_rolling_std_{window}'] = df[target_col].rolling(window).std()
        df[f'{target_col}_rolling_min_{window}'] = df[target_col].rolling(window).min()
        df[f'{target_col}_rolling_max_{window}'] = df[target_col].rolling(window).max()

    return df
python
def create_time_features(df):
    """Create time-based features"""
    df = df.copy()

    # Calendar features
    df['hour'] = df.index.hour
    df['dayofweek'] = df.index.dayofweek
    df['dayofmonth'] = df.index.day
    df['month'] = df.index.month
    df['quarter'] = df.index.quarter
    df['year'] = df.index.year
    df['dayofyear'] = df.index.dayofyear
    df['weekofyear'] = df.index.isocalendar().week

    # Cyclical encoding (for periodic features)
    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    df['dayofweek_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
    df['dayofweek_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)

    # Boolean features
    df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
    df['is_month_start'] = df.index.is_month_start.astype(int)
    df['is_month_end'] = df.index.is_month_end.astype(int)

    return df


def create_lag_features(df, target_col, lags):
    """Create lag features"""
    df = df.copy()

    for lag in lags:
        df[f'{target_col}_lag_{lag}'] = df[target_col].shift(lag)

    return df


def create_rolling_features(df, target_col, windows):
    """Create rolling window features"""
    df = df.copy()

    for window in windows:
        df[f'{target_col}_rolling_mean_{window}'] = df[target_col].rolling(window).mean()
        df[f'{target_col}_rolling_std_{window}'] = df[target_col].rolling(window).std()
        df[f'{target_col}_rolling_min_{window}'] = df[target_col].rolling(window).min()
        df[f'{target_col}_rolling_max_{window}'] = df[target_col].rolling(window).max()

    return df

Apply feature engineering

Apply feature engineering

df_features = create_time_features(df) df_features = create_lag_features(df_features, 'value', [1, 7, 14, 30]) df_features = create_rolling_features(df_features, 'value', [7, 14, 30])
undefined
df_features = create_time_features(df) df_features = create_lag_features(df_features, 'value', [1, 7, 14, 30]) df_features = create_rolling_features(df_features, 'value', [7, 14, 30])
undefined

Evaluation Metrics

评估指标

python
from sklearn.metrics import mean_absolute_error, mean_squared_error

def evaluate_forecast(actual, predicted):
    """Calculate forecasting metrics"""

    mae = mean_absolute_error(actual, predicted)
    mse = mean_squared_error(actual, predicted)
    rmse = np.sqrt(mse)
    mape = np.mean(np.abs((actual - predicted) / actual)) * 100

    # Symmetric MAPE (handles zeros better)
    smape = np.mean(2 * np.abs(actual - predicted) /
                    (np.abs(actual) + np.abs(predicted))) * 100

    print(f"MAE:   {mae:.4f}")
    print(f"RMSE:  {rmse:.4f}")
    print(f"MAPE:  {mape:.2f}%")
    print(f"sMAPE: {smape:.2f}%")

    return {'mae': mae, 'rmse': rmse, 'mape': mape, 'smape': smape}

metrics = evaluate_forecast(test, forecast_mean)
python
from sklearn.metrics import mean_absolute_error, mean_squared_error

def evaluate_forecast(actual, predicted):
    """Calculate forecasting metrics"""

    mae = mean_absolute_error(actual, predicted)
    mse = mean_squared_error(actual, predicted)
    rmse = np.sqrt(mse)
    mape = np.mean(np.abs((actual - predicted) / actual)) * 100

    # Symmetric MAPE (handles zeros better)
    smape = np.mean(2 * np.abs(actual - predicted) /
                    (np.abs(actual) + np.abs(predicted))) * 100

    print(f"MAE:   {mae:.4f}")
    print(f"RMSE:  {rmse:.4f}")
    print(f"MAPE:  {mape:.2f}%")
    print(f"sMAPE: {smape:.2f}%")

    return {'mae': mae, 'rmse': rmse, 'mape': mape, 'smape': smape}

metrics = evaluate_forecast(test, forecast_mean)

Common Issues & Solutions

常见问题与解决方案

Issue: Non-stationary series
Solutions:
- Apply differencing (1st or 2nd order)
- Log transformation for variance stabilization
- Detrending (subtract moving average)
- Seasonal differencing for seasonal data
Issue: Overfitting on training data
Solutions:
- Use proper cross-validation (TimeSeriesSplit)
- Reduce model complexity
- Regularization
- More training data
Issue: Poor forecast accuracy
Solutions:
- Check for data quality issues
- Try different models
- Include external regressors
- Ensemble multiple models
- Feature engineering
问题:非平稳序列
解决方案:
- 应用差分(一阶或二阶)
- 对数变换以稳定方差
- 去趋势(减去移动平均值)
- 对季节性数据应用季节性差分
问题:训练数据过拟合
解决方案:
- 使用合适的交叉验证(TimeSeriesSplit)
- 降低模型复杂度
- 正则化
- 增加训练数据
问题:预测精度差
解决方案:
- 检查数据质量问题
- 尝试不同模型
- 加入外部回归变量
- 集成多个模型
- 特征工程

Best Practices

最佳实践

  1. Always visualize your time series first
  2. Check stationarity before modeling
  3. Use appropriate train/test split (no shuffling!)
  4. Validate with time-based CV (TimeSeriesSplit)
  5. Include confidence intervals in forecasts
  6. Monitor model performance over time
  7. Update models as new data arrives
  8. Document seasonality and trends
  1. 始终先可视化你的时间序列数据
  2. 建模前检查平稳性
  3. 使用合适的训练/测试拆分(不要打乱顺序!)
  4. 使用基于时间的交叉验证(TimeSeriesSplit)
  5. 在预测中包含置信区间
  6. 随时间监控模型性能
  7. 随着新数据到来更新模型
  8. 记录季节性和趋势