"""
File: qumphy/models/minrocket.py
Project: 22HLT01 QUMPHY
Contact: oskar.pfeffer@ptb.de
Gitlab: https://gitlab.com/qumphy
Description: MiniRocKeT implementation as from [...].
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import OrderedDict
[docs]
class MiniRocketFeatures(nn.Module):
"""This is a Pytorch implementation of MiniRocket developed by Malcolm McLean and Ignacio Oguiza
This module extracts MiniRocket features from time-series data using fixed
convolutional kernels, multiple dilations, and proportion of positive values
features.
MiniRocket paper citation:
@article{dempster_etal_2020,
author = {Dempster, Angus and Schmidt, Daniel F and Webb, Geoffrey I},
title = {{MINIROCKET}: A Very Fast (Almost) Deterministic Transform for Time Series Classification},
year = {2020},
journal = {arXiv:2012.08791}
}
Original paper: https://arxiv.org/abs/2012.08791
Original code: https://github.com/angus924/minirocket"""
kernel_size, num_kernels, fitting = 9, 84, False
def __init__(
self,
c_in,
seq_len,
num_features=10_000,
max_dilations_per_kernel=32,
random_state=None,
):
"""Initialize the MiniRocket feature extractor.
Parameters
----------
c_in : int
Number of input channels.
seq_len : int
Length of the input time series.
num_features : int
Number of MiniRocket features to extract.
max_dilations_per_kernel : int
Maximum number of dilations used for each kernel.
random_state : int
Seed used for random channel combinations and bias selection.
"""
super().__init__()
self.c_in, self.seq_len = c_in, seq_len
self.num_features = num_features // self.num_kernels * self.num_kernels
self.max_dilations_per_kernel = max_dilations_per_kernel
self.random_state = random_state
# Convolution
indices = torch.combinations(torch.arange(self.kernel_size), 3).unsqueeze(1)
kernels = (-torch.ones(self.num_kernels, 1, self.kernel_size)).scatter_(
2, indices, 2
)
self.kernels = nn.Parameter(kernels.repeat(c_in, 1, 1), requires_grad=False)
# Dilations & padding
self._set_dilations(seq_len)
# Channel combinations (multivariate)
if c_in > 1:
self._set_channel_combinations(c_in)
# Bias
for i in range(self.num_dilations):
self.register_buffer(
f"biases_{i}",
torch.empty((self.num_kernels, self.num_features_per_dilation[i])),
)
self.register_buffer("prefit", torch.BoolTensor([False]))
[docs]
def forward(self, x):
"""Extract MiniRocket features from an input tensor.
Parameters
----------
x : torch.Tensor
Input tensor of shape (batch_size, c_in, seq_len).
Returns
-------
torch.Tensor
Extracted MiniRocket features of shape
(batch_size, num_features).
"""
_features = []
for i, (dilation, padding) in enumerate(zip(self.dilations, self.padding)):
_padding1 = i % 2
# Convolution
C = F.conv1d(
x, self.kernels, padding=padding, dilation=dilation, groups=self.c_in
)
if self.c_in > 1: # multivariate
C = C.reshape(x.shape[0], self.c_in, self.num_kernels, -1)
channel_combination = getattr(self, f"channel_combinations_{i}")
C = torch.mul(C, channel_combination)
C = C.sum(1)
# Bias
if not self.prefit or self.fitting:
num_features_this_dilation = self.num_features_per_dilation[i]
bias_this_dilation = self._get_bias(C, num_features_this_dilation)
setattr(self, f"biases_{i}", bias_this_dilation)
if self.fitting:
if i < self.num_dilations - 1:
continue
else:
self.prefit = torch.BoolTensor([True])
return
elif i == self.num_dilations - 1:
self.prefit = torch.BoolTensor([True])
else:
bias_this_dilation = getattr(self, f"biases_{i}")
# Features
_features.append(
self._get_PPVs(C[:, _padding1::2], bias_this_dilation[_padding1::2])
)
_features.append(
self._get_PPVs(
C[:, 1 - _padding1 :: 2, padding:-padding],
bias_this_dilation[1 - _padding1 :: 2],
)
)
return torch.cat(_features, dim=1)
def _get_PPVs(self, C, bias):
"""Calculate proportion of positive values features.
Parameters
----------
C : torch.Tensor
Convolution output tensor.
bias : torch.Tensor
Bias values used as thresholds.
Returns
-------
torch.Tensor
Proportion of positive values features.
"""
C = C.unsqueeze(-1)
bias = bias.view(1, bias.shape[0], 1, bias.shape[1])
return (C > bias).float().mean(2).flatten(1)
def _set_dilations(self, input_length):
"""Set dilations and padding values for MiniRocket kernels.
Parameters
----------
input_length : int
Length of the input time series.
Returns
-------
None
The function stores dilation, padding, and feature allocation values
as attributes.
"""
num_features_per_kernel = self.num_features // self.num_kernels
true_max_dilations_per_kernel = min(
num_features_per_kernel, self.max_dilations_per_kernel
)
multiplier = num_features_per_kernel / true_max_dilations_per_kernel
max_exponent = np.log2((input_length - 1) / (9 - 1))
dilations, num_features_per_dilation = np.unique(
np.logspace(0, max_exponent, true_max_dilations_per_kernel, base=2).astype(
np.int32
),
return_counts=True,
)
num_features_per_dilation = (num_features_per_dilation * multiplier).astype(
np.int32
)
remainder = num_features_per_kernel - num_features_per_dilation.sum()
i = 0
while remainder > 0:
num_features_per_dilation[i] += 1
remainder -= 1
i = (i + 1) % len(num_features_per_dilation)
self.num_features_per_dilation = num_features_per_dilation
self.num_dilations = len(dilations)
self.dilations = dilations
self.padding = []
for i, dilation in enumerate(dilations):
self.padding.append((((self.kernel_size - 1) * dilation) // 2))
def _set_channel_combinations(self, num_channels):
"""Set random channel combinations for multivariate input.
Parameters
----------
num_channels : int
Number of input channels.
Returns
-------
None
The function registers channel combination tensors as buffers.
"""
num_combinations = self.num_kernels * self.num_dilations
max_num_channels = min(num_channels, 9)
max_exponent_channels = np.log2(max_num_channels + 1)
np.random.seed(self.random_state)
num_channels_per_combination = (
2 ** np.random.uniform(0, max_exponent_channels, num_combinations)
).astype(np.int32)
channel_combinations = torch.zeros((1, num_channels, num_combinations, 1))
for i in range(num_combinations):
channel_combinations[
:,
np.random.choice(num_channels, num_channels_per_combination[i], False),
i,
] = 1
channel_combinations = torch.split(
channel_combinations, self.num_kernels, 2
) # split by dilation
for i, channel_combination in enumerate(channel_combinations):
self.register_buffer(
f"channel_combinations_{i}", channel_combination
) # per dilation
[docs]
def get_quantiles(self, num_quantiles):
"""Calculate quantile values using the golden ratio.
Parameters
----------
num_quantiles : int
Number of quantile values to calculate.
Returns
-------
list
List containing the calculated quantile values.
"""
# Calculate the golden ratio values
golden_ratio_values = [
(i * ((np.sqrt(5) + 1) / 2)) % 1 for i in range(1, num_quantiles + 1)
]
return golden_ratio_values
def _get_bias(self, input_matrix, num_features_per_dilation):
"""
Calculate biases for a given input matrix and number of features per dilation.
Parameters
----------
input_matrix : torch.Tensor
Input matrix.
num_features_per_dilation : int
Number of features per dilation.
Returns
-------
torch.Tensor
Bias values used as thresholds for MiniRocket features.
"""
np.random.seed(self.random_state)
random_indices = np.random.choice(input_matrix.shape[0], self.num_kernels)
selected_samples = input_matrix[random_indices].diagonal().T
quantiles = torch.tensor(
self.get_quantiles(num_features_per_dilation),
device=input_matrix.device,
dtype=input_matrix.dtype,
)
biases = torch.quantile(
selected_samples.to(torch.float),
quantiles.to(torch.float),
dim=1,
).T
return biases
[docs]
def get_minirocket_features(o, model, chunksize=1024, use_cuda=None, to_np=True):
"""Extract MiniRocket features from a large dataset in chunks.
Parameters
----------
o : np.ndarray or torch.Tensor
Input dataset.
model : nn.Module
MiniRocket feature extraction model.
chunksize : int
Number of samples processed in each chunk.
use_cuda : bool
If True, use CUDA. If False, use CPU. If None, CUDA is used when
available.
to_np : bool
If True, return the features as a NumPy array. If False, return them
as a torch.Tensor.
Returns
-------
np.ndarray or torch.Tensor
Extracted MiniRocket features.
"""
use = torch.cuda.is_available() if use_cuda is None else use_cuda
device = torch.device(torch.cuda.current_device()) if use else torch.device("cpu")
model = model.to(device)
if isinstance(o, np.ndarray):
o = torch.from_numpy(o).to(device)
_features = []
for oi in torch.split(o, chunksize):
_features.append(model(oi))
features = torch.cat(_features).unsqueeze(-1)
if to_np:
return features.cpu().numpy()
else:
return features
[docs]
class MiniRocketHead(nn.Sequential):
def __init__(self, c_in, c_out, bn=True, fc_dropout=0.0):
"""Initialize the MiniRocket prediction head.
Parameters
----------
c_in : int
Number of input features.
c_out : int
Number of output classes or output values.
bn : bool
If True, apply batch normalization before the linear layer.
fc_dropout : float
Dropout probability applied before the linear layer.
"""
layers = [nn.Flatten()]
if bn:
layers += [nn.BatchNorm1d(c_in)]
if fc_dropout:
layers += [nn.Dropout(fc_dropout)]
linear = nn.Linear(c_in, c_out)
nn.init.constant_(linear.weight.data, 0)
nn.init.constant_(linear.bias.data, 0)
layers += [linear]
head = nn.Sequential(*layers)
super().__init__(OrderedDict([("backbone", nn.Sequential()), ("head", head)]))
[docs]
class MiniRocket(nn.Module):
"""MiniRocket model with feature extractor and prediction head."""
def __init__(
self,
c_in,
c_out,
seq_len,
output_activation=nn.Identity(),
num_features=10_000,
max_dilations_per_kernel=32,
random_state=None,
bn=True,
fc_dropout=0,
):
"""Initializes the MiniRocket torch Module
Parameters
----------
c_in : int
number of input channels
c_out : int
output size
seq_len : int
length of the input time series
num_features : int, optional
number of features to extract, by default 10_000
max_dilations_per_kernel : int, optional
maximum number of dilations per kernel, by default 32
random_state : int, optional
Seed of the random number generator, by default None
bn : bool, optional
use batch normalization, by default True
fc_dropout : int, optional
percentage of neurons uing monte carlo dropout, by default 0
"""
super().__init__()
# Backbone
backbone = MiniRocketFeatures(
c_in,
seq_len,
num_features=num_features,
max_dilations_per_kernel=max_dilations_per_kernel,
random_state=random_state,
)
num_features = backbone.num_features
# Head
self.head_nf = num_features
layers = [nn.Flatten()]
if bn:
layers += [nn.BatchNorm1d(num_features)]
if fc_dropout:
layers += [nn.Dropout(fc_dropout)]
linear = nn.Linear(num_features, c_out)
nn.init.constant_(linear.weight.data, 0)
nn.init.constant_(linear.bias.data, 0)
layers += [linear]
head = nn.Sequential(*layers)
self.backbone = backbone
self.head = head
self.output_activation = output_activation
[docs]
def forward(self, x):
"""Run a forward pass through the MiniRocket model.
Parameters
----------
x : torch.Tensor
Input tensor of shape (batch_size, c_in, seq_len).
Returns
-------
torch.Tensor
Model output after feature extraction, prediction head, and output
activation.
"""
x = self.backbone(x)
x = self.head(x)
x = self.output_activation(x)
return x