Source code for qumphy.models.basic_conv1d

"""
File: qumphy/models/basic_conv1d.py
Project: 22HLT01 QUMPHY
Contact: oskar.pfeffer@ptb.de
Gitlab: https://gitlab.com/qumphy
Description: Convolutional Neural Network.
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import math

from typing import Iterable


[docs] class Flatten(nn.Module): """Flatten an input tensor to a single dimension. Parameters ---------- full : bool If True, flatten the complete tensor into a rank-1 tensor. If False, keep the batch dimension and flatten all remaining dimensions. """ def __init__(self, full: bool = False): super().__init__() self.full = full
[docs] def forward(self, x): """Run a forward pass through the flatten layer. Parameters ---------- x : torch.Tensor Input tensor to be flattened. Returns ------- torch.Tensor Flattened tensor. If full is True, the output has shape (-1,). Otherwise, the output has shape (batch_size, -1). """ return x.view(-1) if self.full else x.view(x.size(0), -1)
[docs] def bn_drop_lin( n_in, n_out, bn=True, p=0.0, actn=None, layer_norm=False, permute=False ): """Create batch normalization, dropout, linear, and activation layers. Parameters ---------- n_in : int Number of input features. n_out : int Number of output features. bn : bool If True, add a normalization layer before the linear layer. p : float Dropout probability. actn : nn.Module Activation function added after the linear layer. layer_norm : bool If True, use layer normalization instead of batch normalization. permute : bool If True, permute the input dimensions before and after normalization. This is useful for inputs of shape (batch_size, sequence, features). Returns ------- list List of PyTorch layers. """ layers = [] if permute: layers.append(LambdaLayer(lambda x: x.permute(0, 2, 1))) if bn: if layer_norm is False: layers.append(nn.BatchNorm1d(n_in)) else: layers.append(nn.LayerNorm(n_in)) if permute: layers.append(LambdaLayer(lambda x: x.permute(0, 2, 1))) if p != 0: layers.append(nn.Dropout(p)) layers.append(nn.Linear(n_in, n_out)) if actn is not None: layers.append(actn) return layers
[docs] class LambdaLayer(nn.Module): """Layer that applies a given lambda function to the input. Parameters ---------- lambd : callable Function applied to the input tensor. """ def __init__(self, lambd): super(LambdaLayer, self).__init__() self.lambd = lambd
[docs] def forward(self, x): """Run a forward pass through the lambda layer. Parameters ---------- x : torch.Tensor Input tensor. Returns ------- torch.Tensor Output tensor after applying the lambda function. """ return self.lambd(x)
def _conv1d( in_planes, out_planes, kernel_size=3, stride=1, dilation=1, act="relu", bn=True, drop_p=0, layer_norm=False, ): """Create a 1D convolutional block. Parameters ---------- in_planes : int Number of input channels. out_planes : int Number of output channels. kernel_size : int Size of the convolution kernel. stride : int Stride of the convolution. dilation : int Dilation factor of the convolution. act : str Activation function name. Supported values are "relu", "elu", "prelu", and "gelu". bn : bool If True, add a normalization layer after the convolution. drop_p : float Dropout probability applied before the convolution. layer_norm : bool If True, use layer normalization instead of batch normalization. Returns ------- nn.Sequential Sequential 1D convolutional block. """ lst = [] if drop_p > 0: lst.append(nn.Dropout(drop_p)) lst.append( nn.Conv1d( in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=(kernel_size - 1) // 2, dilation=dilation, bias=not (bn), ) ) if bn: if layer_norm: lst.append(LambdaLayer(lambda x: x.transpose(1, 2))) lst.append(nn.LayerNorm(out_planes)) lst.append(LambdaLayer(lambda x: x.transpose(1, 2))) else: lst.append(nn.BatchNorm1d(out_planes)) if act == "relu": lst.append(nn.ReLU(True)) if act == "elu": lst.append(nn.ELU(True)) if act == "prelu": lst.append(nn.PReLU(True)) if act == "gelu": lst.append(nn.GELU()) return nn.Sequential(*lst) def _fc(in_planes, out_planes, act="relu", bn=True): """Create a fully connected block. Parameters ---------- in_planes : int Number of input features. out_planes : int Number of output features. act : str Activation function name. Supported values are "relu", "elu", and "prelu". bn : bool If True, add batch normalization after the linear layer. Returns ------- nn.Sequential Sequential fully connected block. """ lst = [nn.Linear(in_planes, out_planes, bias=not (bn))] if bn: lst.append(nn.BatchNorm1d(out_planes)) if act == "relu": lst.append(nn.ReLU(True)) if act == "elu": lst.append(nn.ELU(True)) if act == "prelu": lst.append(nn.PReLU(True)) return nn.Sequential(*lst)
[docs] class AdaptiveConcatPool1d(nn.Module): """Concatenate adaptive average pooling and adaptive max pooling. Parameters ---------- sz : int Output size of each pooling layer. If None, the output size is 1. """ def __init__(self, sz=None): "Output will be 2*sz or 2 if sz is None" super().__init__() sz = sz or 1 self.ap, self.mp = nn.AdaptiveAvgPool1d(sz), nn.AdaptiveMaxPool1d(sz)
[docs] def forward(self, x): """Run a forward pass through the adaptive concatenated pooling layer. Parameters ---------- x : torch.Tensor Input tensor. Returns ------- torch.Tensor Tensor obtained by concatenating max-pooled and average-pooled features along the channel dimension. """ return torch.cat([self.mp(x), self.ap(x)], 1)
[docs] class SqueezeExcite1d(nn.Module): """Squeeze-and-excitation block for 1D inputs. Parameters ---------- channels : int Number of input channels. reduction : int Reduction factor used to compute the number of hidden channels. """ def __init__(self, channels, reduction=16): super().__init__() channels_reduced = channels // reduction self.w1 = torch.nn.Parameter( torch.randn(channels_reduced, channels).unsqueeze(0) ) self.w2 = torch.nn.Parameter( torch.randn(channels, channels_reduced).unsqueeze(0) )
[docs] def forward(self, x): """Run a forward pass through the squeeze-and-excitation block. Parameters ---------- x : torch.Tensor Input tensor of shape (batch_size, channels, sequence_length). Returns ------- torch.Tensor Reweighted tensor with the same shape as the input. """ z = torch.mean(x, dim=2, keepdim=True) # bs,ch intermed = F.relu(torch.matmul(self.w1, z)) s = F.sigmoid(torch.matmul(self.w2, intermed)) return s * x
[docs] def weight_init(m): """Initialize weights of supported model layers. Parameters ---------- m : nn.Module Module whose weights should be initialized. Returns ------- None The function modifies the module weights in place. """ if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear): nn.init.kaiming_normal_(m.weight) if m.bias is not None: nn.init.zeros_(m.bias) if isinstance(m, nn.BatchNorm1d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) if isinstance(m, SqueezeExcite1d): stdv1 = math.sqrt(2.0 / m.w1.size[0]) nn.init.normal_(m.w1, 0.0, stdv1) stdv2 = math.sqrt(1.0 / m.w2.size[1]) nn.init.normal_(m.w2, 0.0, stdv2)
[docs] def create_head1d( nf, nc, lin_ftrs=None, ps=0.5, bn: bool = True, act="relu", concat_pooling=True ): """Create a 1D model classification or regression head. Parameters ---------- nf : int Number of input features. nc : int Number of output classes or output values. lin_ftrs : list List of hidden layer sizes used in the head. ps : float or Iterable Dropout probability or list of dropout probabilities. bn : bool If True, use batch normalization in the linear blocks. act : str Activation function used between linear layers. concat_pooling : bool If True, use concatenated adaptive average and max pooling. If False, use adaptive average pooling only. Returns ------- nn.Sequential Sequential model head. """ lin_ftrs = ( [2 * nf if concat_pooling else nf, nc] if lin_ftrs is None else [2 * nf if concat_pooling else nf] + lin_ftrs + [nc] ) # was [nf, 512,nc] ps = [ps] if not isinstance(ps, Iterable) else ps if len(ps) == 1: ps = [ps[0] / 2] * (len(lin_ftrs) - 2) + ps actns = [nn.ReLU(inplace=True) if act == "relu" else nn.ELU(inplace=True)] * ( len(lin_ftrs) - 2 ) + [None] layers = [ AdaptiveConcatPool1d() if concat_pooling else nn.AdaptiveAvgPool1d(1), Flatten(), ] for ni, no, p, actn in zip(lin_ftrs[:-1], lin_ftrs[1:], ps, actns): layers += bn_drop_lin(ni, no, bn, p, actn) return nn.Sequential(*layers)
[docs] class Basic_Conv1d(nn.Sequential): """Basic configurable 1D convolutional neural network. Parameters ---------- filters : list List containing the number of filters for each convolutional layer. kernel_size : int or list Kernel size used for the convolutional layers. stride : int Stride used for the convolutional layers. dilation : int Dilation factor used for the convolutional layers. pool : int Max pooling kernel size. If 0, no max pooling is applied. pool_stride : int Stride used for max pooling. squeeze_excite_reduction : int Reduction factor for squeeze-and-excitation blocks. If 0, no squeeze-and-excitation block is applied. num_classes : int Number of output classes or output values. input_channels : int Number of input channels. act : str Activation function used in the convolutional blocks. bn : bool If True, use batch normalization in convolutional blocks. headless : bool If True, remove the final prediction head and return extracted features. split_first_layer : bool If True, split the first convolutional layer into two convolutional steps. drop_p : float Dropout probability used in convolutional blocks. lin_ftrs_head : list Hidden layer sizes used in the model head. ps_head : float or Iterable Dropout probability or probabilities used in the model head. bn_final_head : bool Indicates whether final batch normalization should be used in the head. bn_head : bool If True, use batch normalization in the model head. act_head : str Activation function used in the model head. concat_pooling : bool If True, use concatenated adaptive average and max pooling in the head. """ def __init__( self, filters=[128, 128, 128, 128], kernel_size=3, stride=2, dilation=1, pool=0, pool_stride=1, squeeze_excite_reduction=0, num_classes=2, input_channels=8, act="relu", bn=True, headless=False, split_first_layer=False, drop_p=0.0, lin_ftrs_head=None, ps_head=0.5, bn_final_head=False, bn_head=True, act_head="relu", concat_pooling=True, ): layers = [] if isinstance(kernel_size, int): kernel_size = [kernel_size] * len(filters) for i in range(len(filters)): layers_tmp = [] layers_tmp.append( _conv1d( input_channels if i == 0 else filters[i - 1], filters[i], kernel_size=kernel_size[i], stride=(1 if (split_first_layer is True and i == 0) else stride), dilation=dilation, act=( "none" if ( (headless is True and i == len(filters) - 1) or (split_first_layer is True and i == 0) ) else act ), bn=False if (headless is True and i == len(filters) - 1) else bn, drop_p=(0.0 if i == 0 else drop_p), ) ) if split_first_layer is True and i == 0: layers_tmp.append( _conv1d( filters[0], filters[0], kernel_size=1, stride=1, act=act, bn=bn, drop_p=0.0, ) ) # layers_tmp.append(nn.Linear(filters[0],filters[0],bias=not(bn))) # layers_tmp.append(_fc(filters[0],filters[0],act=act,bn=bn)) if pool > 0 and i < len(filters) - 1: layers_tmp.append( nn.MaxPool1d(pool, stride=pool_stride, padding=(pool - 1) // 2) ) if squeeze_excite_reduction > 0: layers_tmp.append(SqueezeExcite1d(filters[i], squeeze_excite_reduction)) layers.append(nn.Sequential(*layers_tmp)) # head # layers.append(nn.AdaptiveAvgPool1d(1)) # layers.append(nn.Linear(filters[-1],num_classes)) # head #inplace=True leads to a runtime error see ReLU+ dropout https://discuss.pytorch.org/t/relu-dropout-inplace/13467/5 self.headless = headless if headless is True: head = nn.Sequential(nn.AdaptiveAvgPool1d(1), Flatten()) else: head = create_head1d( filters[-1], nc=num_classes, lin_ftrs=lin_ftrs_head, ps=ps_head, bn_final=bn_final_head, bn=bn_head, act=act_head, concat_pooling=concat_pooling, ) layers.append(head) super().__init__(*layers)
[docs] def get_layer_groups(self): """Get layer groups of the model. Returns ------- tuple Tuple containing selected convolutional layers and the model head. """ return (self[2], self[-1])
[docs] def get_output_layer(self): """Get the output layer of the model. Returns ------- nn.Module or None Output layer if the model is not headless. Otherwise, None. """ if self.headless is False: return self[-1][-1] else: return None
[docs] def set_output_layer(self, x): """Set the output layer of the model. Parameters ---------- x : nn.Module New output layer. Returns ------- None The function modifies the output layer in place. """ if self.headless is False: self[-1][-1] = x
[docs] def fcn(filters=[128] * 5, num_classes=2, input_channels=8, **kwargs): """Create a fully convolutional 1D network. Parameters ---------- filters : list List containing the number of filters for the convolutional layers. num_classes : int Number of output classes or output values. input_channels : int Number of input channels. **kwargs Additional keyword arguments. Returns ------- Basic_Conv1d Fully convolutional 1D model. """ filters_in = filters + [num_classes] return Basic_Conv1d( filters=filters_in, kernel_size=3, stride=1, pool=2, pool_stride=2, input_channels=input_channels, act="relu", bn=True, headless=True, )
[docs] def fcn_wang( num_classes=2, input_channels=8, lin_ftrs_head=None, ps_head=0.5, bn_final_head=False, bn_head=True, act_head="relu", concat_pooling=True, **kwargs, ): """Create a Wang-style fully convolutional 1D network. Parameters ---------- num_classes : int Number of output classes or output values. input_channels : int Number of input channels. lin_ftrs_head : list Hidden layer sizes used in the model head. ps_head : float or Iterable Dropout probability or probabilities used in the model head. bn_final_head : bool Indicates whether final batch normalization should be used in the head. bn_head : bool If True, use batch normalization in the model head. act_head : str Activation function used in the model head. concat_pooling : bool If True, use concatenated adaptive average and max pooling in the head. **kwargs Additional keyword arguments. Returns ------- Basic_Conv1d Wang-style fully convolutional 1D model. """ return Basic_Conv1d( filters=[128, 256, 128], kernel_size=[8, 5, 3], stride=1, pool=0, pool_stride=2, num_classes=num_classes, input_channels=input_channels, act="relu", bn=True, lin_ftrs_head=lin_ftrs_head, ps_head=ps_head, bn_final_head=bn_final_head, bn_head=bn_head, act_head=act_head, concat_pooling=concat_pooling, )
[docs] def schirrmeister( num_classes=2, input_channels=8, kernel_size=10, lin_ftrs_head=None, ps_head=0.5, bn_final_head=False, bn_head=True, act_head="relu", concat_pooling=True, **kwargs, ): """Create a Schirrmeister-style 1D convolutional network. Parameters ---------- num_classes : int Number of output classes or output values. input_channels : int Number of input channels. kernel_size : int Kernel size used in the convolutional layers. lin_ftrs_head : list Hidden layer sizes used in the model head. ps_head : float or Iterable Dropout probability or probabilities used in the model head. bn_final_head : bool Indicates whether final batch normalization should be used in the head. bn_head : bool If True, use batch normalization in the model head. act_head : str Activation function used in the model head. concat_pooling : bool If True, use concatenated adaptive average and max pooling in the head. **kwargs Additional keyword arguments. Returns ------- Basic_Conv1d Schirrmeister-style 1D convolutional model. """ return Basic_Conv1d( filters=[25, 50, 100, 200], kernel_size=kernel_size, stride=3, pool=3, pool_stride=1, num_classes=num_classes, input_channels=input_channels, act="relu", bn=True, headless=False, split_first_layer=True, drop_p=0.5, lin_ftrs_head=lin_ftrs_head, ps_head=ps_head, bn_final_head=bn_final_head, bn_head=bn_head, act_head=act_head, concat_pooling=concat_pooling, )
[docs] def sen( filters=[128] * 5, num_classes=2, input_channels=8, kernel_size=3, squeeze_excite_reduction=16, drop_p=0.0, lin_ftrs_head=None, ps_head=0.5, bn_final_head=False, bn_head=True, act_head="relu", concat_pooling=True, **kwargs, ): """Create a squeeze-and-excitation 1D convolutional network. Parameters ---------- filters : list List containing the number of filters for each convolutional layer. num_classes : int Number of output classes or output values. input_channels : int Number of input channels. kernel_size : int or list Kernel size used in the convolutional layers. squeeze_excite_reduction : int Reduction factor used in squeeze-and-excitation blocks. drop_p : float Dropout probability used in convolutional blocks. lin_ftrs_head : list Hidden layer sizes used in the model head. ps_head : float or Iterable Dropout probability or probabilities used in the model head. bn_final_head : bool Indicates whether final batch normalization should be used in the head. bn_head : bool If True, use batch normalization in the model head. act_head : str Activation function used in the model head. concat_pooling : bool If True, use concatenated adaptive average and max pooling in the head. **kwargs Additional keyword arguments. Returns ------- Basic_Conv1d Squeeze-and-excitation 1D convolutional model. """ return Basic_Conv1d( filters=filters, kernel_size=kernel_size, stride=2, pool=0, pool_stride=0, input_channels=input_channels, act="relu", bn=True, num_classes=num_classes, squeeze_excite_reduction=squeeze_excite_reduction, drop_p=drop_p, lin_ftrs_head=lin_ftrs_head, ps_head=ps_head, bn_final_head=bn_final_head, bn_head=bn_head, act_head=act_head, concat_pooling=concat_pooling, )
[docs] def basic1d( filters=[128] * 5, kernel_size=3, stride=2, dilation=1, pool=0, pool_stride=1, squeeze_excite_reduction=0, num_classes=2, input_channels=8, act="relu", bn=True, headless=False, drop_p=0.0, lin_ftrs_head=None, ps_head=0.5, bn_final_head=False, bn_head=True, act_head="relu", concat_pooling=True, **kwargs, ): """Create a basic configurable 1D convolutional network. Parameters ---------- filters : list List containing the number of filters for each convolutional layer. kernel_size : int or list Kernel size used for the convolutional layers. stride : int Stride used for the convolutional layers. dilation : int Dilation factor used for the convolutional layers. pool : int Max pooling kernel size. If 0, no max pooling is applied. pool_stride : int Stride used for max pooling. squeeze_excite_reduction : int Reduction factor for squeeze-and-excitation blocks. If 0, no squeeze-and-excitation block is applied. num_classes : int Number of output classes or output values. input_channels : int Number of input channels. act : str Activation function used in the convolutional blocks. bn : bool If True, use batch normalization in convolutional blocks. headless : bool If True, remove the final prediction head and return extracted features. drop_p : float Dropout probability used in convolutional blocks. lin_ftrs_head : list Hidden layer sizes used in the model head. ps_head : float or Iterable Dropout probability or probabilities used in the model head. bn_final_head : bool Indicates whether final batch normalization should be used in the head. bn_head : bool If True, use batch normalization in the model head. act_head : str Activation function used in the model head. concat_pooling : bool If True, use concatenated adaptive average and max pooling in the head. **kwargs Additional keyword arguments. Returns ------- Basic_Conv1d Basic configurable 1D convolutional model. """ return Basic_Conv1d( filters=filters, kernel_size=kernel_size, stride=stride, dilation=dilation, pool=pool, pool_stride=pool_stride, squeeze_excite_reduction=squeeze_excite_reduction, num_classes=num_classes, input_channels=input_channels, act=act, bn=bn, headless=headless, drop_p=drop_p, lin_ftrs_head=lin_ftrs_head, ps_head=ps_head, bn_final_head=bn_final_head, bn_head=bn_head, act_head=act_head, concat_pooling=concat_pooling, )