"""
File: qumphy/models/timesnet.py
Project: 22HLT01 QUMPHY
Contact: oskar.pfeffer@ptb.de
Gitlab: https://gitlab.com/qumphy
Description: Adapted from https://github.com/thuml/Time-Series-Library.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.fft
from qumphy.models.utils.embed import DataEmbedding
from qumphy.models.utils.conv_blocks import Inception_Block_V1
[docs]
def FFT_for_Period(x, k=2):
"""Find dominant periods in a time-series batch using the FFT.
Parameters
----------
x : torch.Tensor
Input tensor of shape (batch_size, sequence_length, channels).
k : int
Number of dominant frequencies to select.
Returns
-------
tuple
Tuple containing the selected periods and their corresponding frequency
weights.
"""
xf = torch.fft.rfft(x, dim=1)
frequency_list = abs(xf).mean(0).mean(-1)
frequency_list[0] = 0
_, top_list = torch.topk(frequency_list, k)
top_list = top_list.detach().cpu().numpy()
period = x.shape[1] // top_list
return period, abs(xf).mean(-1)[:, top_list]
[docs]
class TimesBlock(nn.Module):
"""TimesNet block for multi-period temporal feature extraction."""
def __init__(self, seq_len, pred_len, top_k, d_model, d_ff, num_kernels):
"""Initialize the TimesBlock.
Parameters
----------
seq_len : int
Length of the input sequence.
pred_len : int
Length of the prediction sequence.
top_k : int
Number of dominant periods selected by the FFT.
d_model : int
Model hidden dimension.
d_ff : int
Hidden dimension used inside the convolutional block.
num_kernels : int
Number of kernels used in the Inception-style convolution block.
"""
super(TimesBlock, self).__init__()
self.seq_len = seq_len
self.pred_len = pred_len
self.k = top_k
# parameter-efficient design
self.conv = nn.Sequential(
Inception_Block_V1(d_model, d_ff, num_kernels=num_kernels),
nn.GELU(),
Inception_Block_V1(d_ff, d_model, num_kernels=num_kernels),
)
[docs]
def forward(self, x):
B, T, N = x.size()
period_list, period_weight = FFT_for_Period(x, self.k)
res = []
for i in range(self.k):
period = period_list[i]
# padding
if (self.seq_len + self.pred_len) % period != 0:
length = (((self.seq_len + self.pred_len) // period) + 1) * period
padding = torch.zeros(
[x.shape[0], (length - (self.seq_len + self.pred_len)), x.shape[2]]
).to(x.device)
out = torch.cat([x, padding], dim=1)
else:
length = self.seq_len + self.pred_len
out = x
# reshape
out = (
out.reshape(B, length // period, period, N)
.permute(0, 3, 1, 2)
.contiguous()
)
# 2D conv: from 1d Variation to 2d Variation
out = self.conv(out)
# reshape back
out = out.permute(0, 2, 3, 1).reshape(B, -1, N)
res.append(out[:, : (self.seq_len + self.pred_len), :])
res = torch.stack(res, dim=-1)
# adaptive aggregation
period_weight = F.softmax(period_weight, dim=1)
period_weight = period_weight.unsqueeze(1).unsqueeze(1).repeat(1, T, N, 1)
res = torch.sum(res * period_weight, -1)
# residual connection
res = res + x
return res
[docs]
class TimesNet(nn.Module):
"""
TimesNet model for time series forecasting and classification.
Implements the TimesNet architecture as described in:
https://openreview.net/pdf?id=ju_Uqw384Oq
Parameters
----------
seq_len : int
Length of the input sequence.
label_len : int
Length of the label/start token sequence (for forecasting).
pred_len : int, optional
Length of the prediction/output sequence.
e_layers : int, optional
Number of encoder layers. Default is 2.
d_model : int, optional
Model hidden dimension. Default is 16.
d_ff : int, optional
Dimension of the feed-forward network. Default is 32.
num_kernels : int, optional
Number of kernels for the Inception-like blocks. Default is 6.
top_k : int, optional
Top-k selection parameter for TimesBlock. Default is 5.
enc_in : int, optional
Number of input channels/features. Default is 1.
c_out : int, optional
Number of output channels/features or classes. Default is 1.
embed : str, optional
Type of time feature embedding. Options are 'timeF', 'fixed', or 'learned'. Default is 'fixed'.
freq : str, optional
Frequency string for time feature encoding (e.g., 'h' for hourly). Default is 's'.
dropout : float, optional
Dropout rate. Default is 0.1.
num_class : int, optional
Number of classes (for classification tasks).
task_name : str, optional
Task type, either 'classification' or 'forecasting'. Default is 'classification'.
References
----------
.. [1] https://openreview.net/pdf?id=ju_Uqw384Oq
"""
def __init__(
self,
num_class,
seq_len,
label_len=1,
pred_len=1,
e_layers=2,
d_model=16,
d_ff=32,
num_kernels=6,
top_k=5,
enc_in=1,
c_out=1,
embed="fixed",
freq="s",
dropout=0.1,
task_name="classification",
):
super(TimesNet, self).__init__()
self.task_name = task_name
self.seq_len = seq_len
self.label_len = label_len
self.pred_len = pred_len
self.model = nn.ModuleList(
[
TimesBlock(seq_len, pred_len, top_k, d_model, d_ff, num_kernels)
for _ in range(e_layers)
]
)
self.enc_embedding = DataEmbedding(
enc_in,
d_model,
embed,
freq,
dropout,
)
self.layer = e_layers
self.layer_norm = nn.LayerNorm(d_model)
if (
self.task_name == "long_term_forecast"
or self.task_name == "short_term_forecast"
):
self.predict_linear = nn.Linear(self.seq_len, self.pred_len + self.seq_len)
self.projection = nn.Linear(d_model, c_out, bias=True)
if self.task_name == "imputation" or self.task_name == "anomaly_detection":
self.projection = nn.Linear(d_model, c_out, bias=True)
if self.task_name == "classification":
self.act = F.gelu
self.dropout = nn.Dropout(dropout)
self.projection = nn.Linear(d_model * seq_len, num_class)
[docs]
def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
"""Run forecasting with the TimesNet model.
Parameters
----------
x_enc : torch.Tensor
Encoder input tensor of shape (batch_size, seq_len, enc_in).
x_mark_enc : torch.Tensor
Encoder time feature tensor.
x_dec : torch.Tensor
Decoder input tensor. This parameter is kept for compatibility.
x_mark_dec : torch.Tensor
Decoder time feature tensor. This parameter is kept for compatibility.
Returns
-------
torch.Tensor
Forecast output tensor of shape
(batch_size, seq_len + pred_len, c_out).
"""
means = x_enc.mean(1, keepdim=True).detach()
x_enc = x_enc.sub(means)
stdev = torch.sqrt(torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
x_enc = x_enc.div(stdev)
# embedding
enc_out = self.enc_embedding(x_enc, x_mark_enc) # [B,T,C]
enc_out = self.predict_linear(enc_out.permute(0, 2, 1)).permute(
0, 2, 1
) # align temporal dimension
# TimesNet
for i in range(self.layer):
enc_out = self.layer_norm(self.model[i](enc_out))
# project back
dec_out = self.projection(enc_out)
# De-Normalization from Non-stationary Transformer
dec_out = dec_out.mul(
(stdev[:, 0, :].unsqueeze(1).repeat(1, self.pred_len + self.seq_len, 1))
)
dec_out = dec_out.add(
(means[:, 0, :].unsqueeze(1).repeat(1, self.pred_len + self.seq_len, 1))
)
return dec_out
[docs]
def imputation(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask):
"""Run imputation with the TimesNet model.
Parameters
----------
x_enc : torch.Tensor
Encoder input tensor of shape (batch_size, seq_len, enc_in).
x_mark_enc : torch.Tensor
Encoder time feature tensor.
x_dec : torch.Tensor
Decoder input tensor. This parameter is kept for compatibility.
x_mark_dec : torch.Tensor
Decoder time feature tensor. This parameter is kept for compatibility.
mask : torch.Tensor
Mask tensor indicating observed and missing values.
Returns
-------
torch.Tensor
Imputed output tensor.
"""
means = torch.sum(x_enc, dim=1) / torch.sum(mask == 1, dim=1)
means = means.unsqueeze(1).detach()
x_enc = x_enc.sub(means)
x_enc = x_enc.masked_fill(mask == 0, 0)
stdev = torch.sqrt(
torch.sum(x_enc * x_enc, dim=1) / torch.sum(mask == 1, dim=1) + 1e-5
)
stdev = stdev.unsqueeze(1).detach()
x_enc = x_enc.div(stdev)
# embedding
enc_out = self.enc_embedding(x_enc, x_mark_enc) # [B,T,C]
# TimesNet
for i in range(self.layer):
enc_out = self.layer_norm(self.model[i](enc_out))
# project back
dec_out = self.projection(enc_out)
# De-Normalization from Non-stationary Transformer
dec_out = dec_out.mul(
(stdev[:, 0, :].unsqueeze(1).repeat(1, self.pred_len + self.seq_len, 1))
)
dec_out = dec_out.add(
(means[:, 0, :].unsqueeze(1).repeat(1, self.pred_len + self.seq_len, 1))
)
return dec_out
[docs]
def anomaly_detection(self, x_enc):
"""Run anomaly detection with the TimesNet model.
Parameters
----------
x_enc : torch.Tensor
Encoder input tensor of shape (batch_size, seq_len, enc_in).
Returns
-------
torch.Tensor
Reconstructed output tensor.
"""
means = x_enc.mean(1, keepdim=True).detach()
x_enc = x_enc.sub(means)
stdev = torch.sqrt(torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5)
x_enc = x_enc.div(stdev)
# embedding
enc_out = self.enc_embedding(x_enc, None) # [B,T,C]
# TimesNet
for i in range(self.layer):
enc_out = self.layer_norm(self.model[i](enc_out))
# project back
dec_out = self.projection(enc_out)
# De-Normalization from Non-stationary Transformer
dec_out = dec_out.mul(
(stdev[:, 0, :].unsqueeze(1).repeat(1, self.pred_len + self.seq_len, 1))
)
dec_out = dec_out.add(
(means[:, 0, :].unsqueeze(1).repeat(1, self.pred_len + self.seq_len, 1))
)
return dec_out
[docs]
def classification(self, x_enc, x_mark_enc):
"""Run classification with the TimesNet model.
Parameters
----------
x_enc : torch.Tensor
Input tensor of shape (batch_size, enc_in, seq_len).
x_mark_enc : torch.Tensor
Encoder time feature tensor. This parameter is kept for compatibility.
Returns
-------
torch.Tensor
Classification logits of shape (batch_size, num_class).
"""
x_enc = x_enc.permute(0, 2, 1)
# embedding
enc_out = self.enc_embedding(x_enc, None) # [B,T,C]
# TimesNet
for i in range(self.layer):
enc_out = self.layer_norm(self.model[i](enc_out))
# the output transformer encoder/decoder embeddings don't include non-linearity
output = self.act(enc_out)
output = self.dropout(output)
# zero-out padding embeddings
# output = output * x_mark_enc.unsqueeze(-1)
# (batch_size, seq_length * d_model)
output = output.reshape(output.shape[0], -1)
output = self.projection(output) # (batch_size, num_classes)
return output
[docs]
def forward(self, x_enc, x_mark_enc=None, x_dec=None, x_mark_dec=None, mask=None):
if (
self.task_name == "long_term_forecast"
or self.task_name == "short_term_forecast"
):
dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)
return dec_out[:, -self.pred_len :, :] # [B, L, D]
if self.task_name == "imputation":
dec_out = self.imputation(x_enc, x_mark_enc, x_dec, x_mark_dec, mask)
return dec_out # [B, L, D]
if self.task_name == "anomaly_detection":
dec_out = self.anomaly_detection(x_enc)
return dec_out # [B, L, D]
if self.task_name == "classification":
dec_out = self.classification(x_enc, x_mark_enc)
return dec_out # [B, N]
return None