Advanced Topics#
This tutorial covers advanced features of DeepTimeSeries, including probabilistic forecasting, custom models, multi-head models, and other advanced use cases.
Probabilistic Forecasting#
DeepTimeSeries supports probabilistic forecasting through DistributionHead,
which allows models to predict probability distributions instead of point values.
Why Probabilistic Forecasting?#
Probabilistic forecasting provides: - Uncertainty quantification: Understand prediction confidence - Risk assessment: Evaluate worst-case scenarios - Better decision making: Make informed decisions under uncertainty
Using DistributionHead#
To use probabilistic forecasting, create a DistributionHead with a PyTorch distribution:
import torch
import torch.nn as nn
import torch.distributions as dist
from deep_time_series.core import DistributionHead
from deep_time_series.model import MLP
# Create DistributionHead for Normal distribution
prob_head = DistributionHead(
tag='targets',
distribution=dist.Normal,
in_features=64, # Output size from model body
out_features=1, # Number of target features
)
# Use with MLP model
model = MLP(
hidden_size=64,
encoding_length=20,
decoding_length=10,
target_names=['target'],
nontarget_names=[],
n_hidden_layers=2,
head=prob_head, # Use probabilistic head
)
Supported Distributions#
You can use any PyTorch distribution that supports the required operations:
# Normal distribution (mean and std)
dist.Normal
# Laplace distribution (loc and scale)
dist.Laplace
# Student's t-distribution (df, loc, scale)
dist.StudentT
# Log-normal distribution (loc and scale)
dist.LogNormal
# And many more...
Example: Normal Distribution#
Here’s a complete example using Normal distribution:
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
import torch.distributions as dist
from torch.utils.data import DataLoader
from sklearn.preprocessing import StandardScaler
import deep_time_series as dts
from deep_time_series.core import DistributionHead
from deep_time_series.model import MLP
from deep_time_series.chunk import ChunkInverter
# Prepare data
data = pd.DataFrame({
'target': np.sin(np.arange(100)) + 0.1 * np.random.randn(100)
})
transformer = dts.ColumnTransformer(
transformer_tuples=[(StandardScaler(), ['target'])]
)
data = transformer.fit_transform(data)
# Create probabilistic head
prob_head = DistributionHead(
tag='targets',
distribution=dist.Normal,
in_features=64,
out_features=1,
)
# Create model
model = MLP(
hidden_size=64,
encoding_length=20,
decoding_length=10,
target_names=['target'],
nontarget_names=[],
n_hidden_layers=2,
head=prob_head,
)
# Train
dataset = dts.TimeSeriesDataset(
data_frames=data,
chunk_specs=model.make_chunk_specs()
)
dataloader = DataLoader(dataset, batch_size=32)
trainer = pl.Trainer(max_epochs=10)
trainer.fit(model, train_dataloaders=dataloader)
# Predict
model.eval()
batch = next(iter(dataloader))
with torch.no_grad():
outputs = model(batch)
# Extract distribution parameters
mean = outputs['head.targets.loc'] # Mean predictions
std = outputs['head.targets.scale'] # Standard deviation
# Sample from distribution
samples = torch.distributions.Normal(mean, std).sample()
Accessing Distribution Parameters#
When using DistributionHead, the outputs dictionary contains:
- head.{tag}: Sampled values
- head.{tag}.{param}: Distribution parameters (e.g., head.targets.loc, head.targets.scale)
Multi-Head Models#
Multi-head models allow predicting multiple targets or using different loss functions for different outputs.
Use Cases#
Multiple targets: Predict several related variables simultaneously
Different loss functions: Use different losses for different outputs
Auxiliary tasks: Predict auxiliary information alongside main target
Creating Multi-Head Models#
import torch.nn as nn
from deep_time_series.core import Head
from deep_time_series.model import MLP
# Create multiple heads
heads = [
Head(
tag='target1',
output_module=nn.Linear(64, 1),
loss_fn=nn.MSELoss(),
loss_weight=1.0,
),
Head(
tag='target2',
output_module=nn.Linear(64, 1),
loss_fn=nn.L1Loss(),
loss_weight=0.5, # Lower weight for this head
),
]
# Create model
model = MLP(
hidden_size=64,
encoding_length=20,
decoding_length=10,
target_names=['target1', 'target2'], # Multiple targets
nontarget_names=[],
n_hidden_layers=2,
)
# Set multiple heads
model.heads = heads
Chunk Specifications for Multi-Head#
When using multiple heads, you need to define chunk specifications for each target:
from deep_time_series.chunk import EncodingChunkSpec, LabelChunkSpec
# Model will automatically create specs for all targets
chunk_specs = model.make_chunk_specs()
# Or create manually
chunk_specs = [
EncodingChunkSpec(tag='target1', names=['target1'], range_=(0, 20), dtype=np.float32),
LabelChunkSpec(tag='target1', names=['target1'], range_=(20, 30), dtype=np.float32),
EncodingChunkSpec(tag='target2', names=['target2'], range_=(0, 20), dtype=np.float32),
LabelChunkSpec(tag='target2', names=['target2'], range_=(20, 30), dtype=np.float32),
]
Loss Weighting#
Each head can have a different loss weight:
heads = [
Head(..., loss_weight=1.0), # Main target
Head(..., loss_weight=0.5), # Secondary target
Head(..., loss_weight=0.1), # Auxiliary task
]
The total loss is: loss = w1*loss1 + w2*loss2 + w3*loss3
Custom Models#
You can create custom models by inheriting from ForecastingModule.
Basic Custom Model#
Here’s a minimal example:
import numpy as np
import torch
import torch.nn as nn
from deep_time_series.chunk import EncodingChunkSpec, LabelChunkSpec
from deep_time_series.core import ForecastingModule, Head
class SimpleCustomModel(ForecastingModule):
def __init__(
self,
hidden_size,
encoding_length,
decoding_length,
target_names,
nontarget_names,
):
super().__init__()
self.encoding_length = encoding_length
self.decoding_length = decoding_length
n_outputs = len(target_names)
n_features = len(nontarget_names) + n_outputs
# Define encoder
self.encoder = nn.Sequential(
nn.Linear(n_features * encoding_length, hidden_size),
nn.ReLU(),
)
# Define decoder
self.decoder = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_outputs),
)
# Define head
self.head = Head(
tag='targets',
output_module=nn.Linear(hidden_size, n_outputs),
loss_fn=nn.MSELoss(),
)
def encode(self, inputs):
# Concatenate target and non-target features
if len(self.hparams.nontarget_names) > 0:
x = torch.cat([
inputs['encoding.targets'],
inputs['encoding.nontargets']
], dim=2)
else:
x = inputs['encoding.targets']
# Flatten and encode
B = x.size(0)
x_flat = x.view(B, -1)
encoded = self.encoder(x_flat)
return {'context': encoded}
def decode_eval(self, inputs):
context = inputs['context']
self.head.reset()
for i in range(self.decoding_length):
# Generate prediction
pred = self.head(context.unsqueeze(1))
return self.head.get_outputs()
def make_chunk_specs(self):
E = self.encoding_length
D = self.decoding_length
specs = [
EncodingChunkSpec(
tag='targets',
names=self.hparams.target_names,
range_=(0, E),
dtype=np.float32,
),
LabelChunkSpec(
tag='targets',
names=self.hparams.target_names,
range_=(E, E + D),
dtype=np.float32,
),
]
if len(self.hparams.nontarget_names) > 0:
specs.extend([
EncodingChunkSpec(
tag='nontargets',
names=self.hparams.nontarget_names,
range_=(1, E + 1),
dtype=np.float32,
),
])
return specs
Using Custom Models#
Use your custom model just like built-in models:
model = SimpleCustomModel(
hidden_size=64,
encoding_length=20,
decoding_length=10,
target_names=['target'],
nontarget_names=['feature'],
)
dataset = dts.TimeSeriesDataset(
data_frames=data,
chunk_specs=model.make_chunk_specs()
)
trainer = pl.Trainer(max_epochs=10)
trainer.fit(model, train_dataloaders=dataloader)
Custom Optimizers#
All models support custom optimizers:
import torch.optim as optim
model = MLP(
...,
optimizer=optim.SGD,
optimizer_options={'momentum': 0.9, 'weight_decay': 1e-4},
lr=0.01,
)
Or override configure_optimizers():
class CustomModel(ForecastingModule):
def configure_optimizers(self):
optimizer = optim.Adam(self.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
return {
'optimizer': optimizer,
'lr_scheduler': scheduler,
}
Custom Metrics#
Add custom metrics to track model performance:
from torchmetrics import MeanAbsoluteError, MeanSquaredError, R2Score
model = MLP(
...,
metrics=[
MeanAbsoluteError(),
MeanSquaredError(),
R2Score(),
],
)
Metrics are automatically logged during training and validation.
Working with Multiple Time Series#
DeepTimeSeries supports multiple time series in a single dataset:
# Create multiple time series
series1 = pd.DataFrame({'target': np.sin(np.arange(100))})
series2 = pd.DataFrame({'target': np.cos(np.arange(50))})
series3 = pd.DataFrame({'target': np.random.randn(75)})
# Pass as list
dataset = dts.TimeSeriesDataset(
data_frames=[series1, series2, series3],
chunk_specs=model.make_chunk_specs()
)
# Dataset automatically handles different lengths
print(f"Total samples: {len(dataset)}")
The dataset will: - Handle different lengths automatically - Create separate chunk extractors for each series - Combine all samples for training
Advanced Data Preprocessing#
Using Multiple Transformers#
Apply different transformations to different columns:
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
transformer = dts.ColumnTransformer(
transformer_tuples=[
(StandardScaler(), ['target']),
(MinMaxScaler(), ['feature1']),
(RobustScaler(), ['feature2']),
]
)
Using Transformer Dictionary#
You can also use a dictionary:
transformer = dts.ColumnTransformer(
transformer_dict={
'target': StandardScaler(),
'feature1': MinMaxScaler(),
'feature2': RobustScaler(),
}
)
Inverse Transformation#
To inverse transform predictions:
# Get transformer for specific columns
target_transformer = transformer.named_transformers_['target']
# Inverse transform predictions
predictions_original_scale = target_transformer.inverse_transform(
predictions_df.values
)
Best Practices#
Data Preprocessing: Always normalize/scale your data appropriately
Chunk Specifications: Ensure chunk ranges don’t overlap incorrectly
Head Reset: Remember to call
head.reset()at the start of decodingBatch Size: Adjust batch size based on sequence length and model size
Learning Rate: Use learning rate scheduling for better convergence
Validation: Always use validation data to monitor overfitting
Device Management: PyTorch Lightning handles device placement automatically
Debugging Tips#
Check Chunk Specifications: Use
dataset.plot_chunks()to visualize chunk rangesInspect Outputs: Print output dictionary keys to verify head tags
Monitor Loss: Check if loss decreases as expected
Shape Debugging: Print tensor shapes at each step
Gradient Flow: Use
torch.autograd.detect_anomaly()to find gradient issues
Example: Complete Advanced Workflow#
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
import torch.distributions as dist
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.preprocessing import StandardScaler
from torchmetrics import MeanAbsoluteError, MeanSquaredError
import deep_time_series as dts
from deep_time_series.core import DistributionHead
from deep_time_series.model import MLP
from deep_time_series.chunk import ChunkInverter
# 1. Prepare multiple time series
series1 = pd.DataFrame({
'target': np.sin(np.arange(100)) + 0.1 * np.random.randn(100),
'feature': np.cos(np.arange(100))
})
series2 = pd.DataFrame({
'target': np.cos(np.arange(80)) + 0.1 * np.random.randn(80),
'feature': np.sin(np.arange(80))
})
# 2. Preprocess
transformer = dts.ColumnTransformer(
transformer_tuples=[
(StandardScaler(), ['target', 'feature'])
]
)
series1_transformed = transformer.fit_transform(series1)
series2_transformed = transformer.transform(series2)
# 3. Create probabilistic head with metrics
prob_head = DistributionHead(
tag='targets',
distribution=dist.Normal,
in_features=64,
out_features=1,
metrics=[MeanAbsoluteError(), MeanSquaredError()],
)
# 4. Create model
model = MLP(
hidden_size=64,
encoding_length=20,
decoding_length=10,
target_names=['target'],
nontarget_names=['feature'],
n_hidden_layers=2,
head=prob_head,
optimizer=torch.optim.AdamW,
optimizer_options={'weight_decay': 1e-4},
lr=1e-3,
)
# 5. Create dataset with multiple series
dataset = dts.TimeSeriesDataset(
data_frames=[series1_transformed, series2_transformed],
chunk_specs=model.make_chunk_specs()
)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# 6. Train with callbacks
trainer = pl.Trainer(
max_epochs=50,
callbacks=[
pl.callbacks.EarlyStopping(monitor='val/loss', patience=5),
pl.callbacks.ModelCheckpoint(monitor='val/loss'),
],
)
trainer.fit(model, train_dataloaders=dataloader, val_dataloaders=dataloader)
# 7. Predict and extract uncertainty
model.eval()
batch = next(iter(dataloader))
with torch.no_grad():
outputs = model(batch)
mean = outputs['head.targets.loc']
std = outputs['head.targets.scale']
# 8. Convert to DataFrame
inverter = ChunkInverter(model.make_chunk_specs())
predictions_df = inverter.invert('head.targets.loc', mean)
print("Predictions with uncertainty:")
print(predictions_df.head())