Source code for qumphy.models.minirocket

"""
File: qumphy/models/minrocket.py
Project: 22HLT01 QUMPHY
Contact: oskar.pfeffer@ptb.de
Gitlab: https://gitlab.com/qumphy
Description: MiniRocKeT implementation as from [...].
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import OrderedDict


[docs] class MiniRocketFeatures(nn.Module): """This is a Pytorch implementation of MiniRocket developed by Malcolm McLean and Ignacio Oguiza This module extracts MiniRocket features from time-series data using fixed convolutional kernels, multiple dilations, and proportion of positive values features. MiniRocket paper citation: @article{dempster_etal_2020, author = {Dempster, Angus and Schmidt, Daniel F and Webb, Geoffrey I}, title = {{MINIROCKET}: A Very Fast (Almost) Deterministic Transform for Time Series Classification}, year = {2020}, journal = {arXiv:2012.08791} } Original paper: https://arxiv.org/abs/2012.08791 Original code: https://github.com/angus924/minirocket""" kernel_size, num_kernels, fitting = 9, 84, False def __init__( self, c_in, seq_len, num_features=10_000, max_dilations_per_kernel=32, random_state=None, ): """Initialize the MiniRocket feature extractor. Parameters ---------- c_in : int Number of input channels. seq_len : int Length of the input time series. num_features : int Number of MiniRocket features to extract. max_dilations_per_kernel : int Maximum number of dilations used for each kernel. random_state : int Seed used for random channel combinations and bias selection. """ super().__init__() self.c_in, self.seq_len = c_in, seq_len self.num_features = num_features // self.num_kernels * self.num_kernels self.max_dilations_per_kernel = max_dilations_per_kernel self.random_state = random_state # Convolution indices = torch.combinations(torch.arange(self.kernel_size), 3).unsqueeze(1) kernels = (-torch.ones(self.num_kernels, 1, self.kernel_size)).scatter_( 2, indices, 2 ) self.kernels = nn.Parameter(kernels.repeat(c_in, 1, 1), requires_grad=False) # Dilations & padding self._set_dilations(seq_len) # Channel combinations (multivariate) if c_in > 1: self._set_channel_combinations(c_in) # Bias for i in range(self.num_dilations): self.register_buffer( f"biases_{i}", torch.empty((self.num_kernels, self.num_features_per_dilation[i])), ) self.register_buffer("prefit", torch.BoolTensor([False]))
[docs] def forward(self, x): """Extract MiniRocket features from an input tensor. Parameters ---------- x : torch.Tensor Input tensor of shape (batch_size, c_in, seq_len). Returns ------- torch.Tensor Extracted MiniRocket features of shape (batch_size, num_features). """ _features = [] for i, (dilation, padding) in enumerate(zip(self.dilations, self.padding)): _padding1 = i % 2 # Convolution C = F.conv1d( x, self.kernels, padding=padding, dilation=dilation, groups=self.c_in ) if self.c_in > 1: # multivariate C = C.reshape(x.shape[0], self.c_in, self.num_kernels, -1) channel_combination = getattr(self, f"channel_combinations_{i}") C = torch.mul(C, channel_combination) C = C.sum(1) # Bias if not self.prefit or self.fitting: num_features_this_dilation = self.num_features_per_dilation[i] bias_this_dilation = self._get_bias(C, num_features_this_dilation) setattr(self, f"biases_{i}", bias_this_dilation) if self.fitting: if i < self.num_dilations - 1: continue else: self.prefit = torch.BoolTensor([True]) return elif i == self.num_dilations - 1: self.prefit = torch.BoolTensor([True]) else: bias_this_dilation = getattr(self, f"biases_{i}") # Features _features.append( self._get_PPVs(C[:, _padding1::2], bias_this_dilation[_padding1::2]) ) _features.append( self._get_PPVs( C[:, 1 - _padding1 :: 2, padding:-padding], bias_this_dilation[1 - _padding1 :: 2], ) ) return torch.cat(_features, dim=1)
def _get_PPVs(self, C, bias): """Calculate proportion of positive values features. Parameters ---------- C : torch.Tensor Convolution output tensor. bias : torch.Tensor Bias values used as thresholds. Returns ------- torch.Tensor Proportion of positive values features. """ C = C.unsqueeze(-1) bias = bias.view(1, bias.shape[0], 1, bias.shape[1]) return (C > bias).float().mean(2).flatten(1) def _set_dilations(self, input_length): """Set dilations and padding values for MiniRocket kernels. Parameters ---------- input_length : int Length of the input time series. Returns ------- None The function stores dilation, padding, and feature allocation values as attributes. """ num_features_per_kernel = self.num_features // self.num_kernels true_max_dilations_per_kernel = min( num_features_per_kernel, self.max_dilations_per_kernel ) multiplier = num_features_per_kernel / true_max_dilations_per_kernel max_exponent = np.log2((input_length - 1) / (9 - 1)) dilations, num_features_per_dilation = np.unique( np.logspace(0, max_exponent, true_max_dilations_per_kernel, base=2).astype( np.int32 ), return_counts=True, ) num_features_per_dilation = (num_features_per_dilation * multiplier).astype( np.int32 ) remainder = num_features_per_kernel - num_features_per_dilation.sum() i = 0 while remainder > 0: num_features_per_dilation[i] += 1 remainder -= 1 i = (i + 1) % len(num_features_per_dilation) self.num_features_per_dilation = num_features_per_dilation self.num_dilations = len(dilations) self.dilations = dilations self.padding = [] for i, dilation in enumerate(dilations): self.padding.append((((self.kernel_size - 1) * dilation) // 2)) def _set_channel_combinations(self, num_channels): """Set random channel combinations for multivariate input. Parameters ---------- num_channels : int Number of input channels. Returns ------- None The function registers channel combination tensors as buffers. """ num_combinations = self.num_kernels * self.num_dilations max_num_channels = min(num_channels, 9) max_exponent_channels = np.log2(max_num_channels + 1) np.random.seed(self.random_state) num_channels_per_combination = ( 2 ** np.random.uniform(0, max_exponent_channels, num_combinations) ).astype(np.int32) channel_combinations = torch.zeros((1, num_channels, num_combinations, 1)) for i in range(num_combinations): channel_combinations[ :, np.random.choice(num_channels, num_channels_per_combination[i], False), i, ] = 1 channel_combinations = torch.split( channel_combinations, self.num_kernels, 2 ) # split by dilation for i, channel_combination in enumerate(channel_combinations): self.register_buffer( f"channel_combinations_{i}", channel_combination ) # per dilation
[docs] def get_quantiles(self, num_quantiles): """Calculate quantile values using the golden ratio. Parameters ---------- num_quantiles : int Number of quantile values to calculate. Returns ------- list List containing the calculated quantile values. """ # Calculate the golden ratio values golden_ratio_values = [ (i * ((np.sqrt(5) + 1) / 2)) % 1 for i in range(1, num_quantiles + 1) ] return golden_ratio_values
def _get_bias(self, input_matrix, num_features_per_dilation): """ Calculate biases for a given input matrix and number of features per dilation. Parameters ---------- input_matrix : torch.Tensor Input matrix. num_features_per_dilation : int Number of features per dilation. Returns ------- torch.Tensor Bias values used as thresholds for MiniRocket features. """ np.random.seed(self.random_state) random_indices = np.random.choice(input_matrix.shape[0], self.num_kernels) selected_samples = input_matrix[random_indices].diagonal().T quantiles = torch.tensor( self.get_quantiles(num_features_per_dilation), device=input_matrix.device, dtype=input_matrix.dtype, ) biases = torch.quantile( selected_samples.to(torch.float), quantiles.to(torch.float), dim=1, ).T return biases
[docs] def extract_features(self, data): """Extract MiniRocket features from input data. Parameters ---------- data : torch.Tensor Input data of shape (batch_size, seq_len) or (batch_size, c_in, seq_len). Returns ------- torch.Tensor Extracted MiniRocket features. """ if data.ndim == 2: data = torch.unsqueeze(data, 1) features = self(data) return features
[docs] def get_minirocket_features(o, model, chunksize=1024, use_cuda=None, to_np=True): """Extract MiniRocket features from a large dataset in chunks. Parameters ---------- o : np.ndarray or torch.Tensor Input dataset. model : nn.Module MiniRocket feature extraction model. chunksize : int Number of samples processed in each chunk. use_cuda : bool If True, use CUDA. If False, use CPU. If None, CUDA is used when available. to_np : bool If True, return the features as a NumPy array. If False, return them as a torch.Tensor. Returns ------- np.ndarray or torch.Tensor Extracted MiniRocket features. """ use = torch.cuda.is_available() if use_cuda is None else use_cuda device = torch.device(torch.cuda.current_device()) if use else torch.device("cpu") model = model.to(device) if isinstance(o, np.ndarray): o = torch.from_numpy(o).to(device) _features = [] for oi in torch.split(o, chunksize): _features.append(model(oi)) features = torch.cat(_features).unsqueeze(-1) if to_np: return features.cpu().numpy() else: return features
[docs] class MiniRocketHead(nn.Sequential): def __init__(self, c_in, c_out, bn=True, fc_dropout=0.0): """Initialize the MiniRocket prediction head. Parameters ---------- c_in : int Number of input features. c_out : int Number of output classes or output values. bn : bool If True, apply batch normalization before the linear layer. fc_dropout : float Dropout probability applied before the linear layer. """ layers = [nn.Flatten()] if bn: layers += [nn.BatchNorm1d(c_in)] if fc_dropout: layers += [nn.Dropout(fc_dropout)] linear = nn.Linear(c_in, c_out) nn.init.constant_(linear.weight.data, 0) nn.init.constant_(linear.bias.data, 0) layers += [linear] head = nn.Sequential(*layers) super().__init__(OrderedDict([("backbone", nn.Sequential()), ("head", head)]))
[docs] class MiniRocket(nn.Module): """MiniRocket model with feature extractor and prediction head.""" def __init__( self, c_in, c_out, seq_len, output_activation=nn.Identity(), num_features=10_000, max_dilations_per_kernel=32, random_state=None, bn=True, fc_dropout=0, ): """Initializes the MiniRocket torch Module Parameters ---------- c_in : int number of input channels c_out : int output size seq_len : int length of the input time series num_features : int, optional number of features to extract, by default 10_000 max_dilations_per_kernel : int, optional maximum number of dilations per kernel, by default 32 random_state : int, optional Seed of the random number generator, by default None bn : bool, optional use batch normalization, by default True fc_dropout : int, optional percentage of neurons uing monte carlo dropout, by default 0 """ super().__init__() # Backbone backbone = MiniRocketFeatures( c_in, seq_len, num_features=num_features, max_dilations_per_kernel=max_dilations_per_kernel, random_state=random_state, ) num_features = backbone.num_features # Head self.head_nf = num_features layers = [nn.Flatten()] if bn: layers += [nn.BatchNorm1d(num_features)] if fc_dropout: layers += [nn.Dropout(fc_dropout)] linear = nn.Linear(num_features, c_out) nn.init.constant_(linear.weight.data, 0) nn.init.constant_(linear.bias.data, 0) layers += [linear] head = nn.Sequential(*layers) self.backbone = backbone self.head = head self.output_activation = output_activation
[docs] def forward(self, x): """Run a forward pass through the MiniRocket model. Parameters ---------- x : torch.Tensor Input tensor of shape (batch_size, c_in, seq_len). Returns ------- torch.Tensor Model output after feature extraction, prediction head, and output activation. """ x = self.backbone(x) x = self.head(x) x = self.output_activation(x) return x