"""
File: qumphy/models/basic_conv1d.py
Project: 22HLT01 QUMPHY
Contact: oskar.pfeffer@ptb.de
Gitlab: https://gitlab.com/qumphy
Description: Convolutional Neural Network.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Iterable
[docs]
class Flatten(nn.Module):
"""Flatten an input tensor to a single dimension.
Parameters
----------
full : bool
If True, flatten the complete tensor into a rank-1 tensor.
If False, keep the batch dimension and flatten all remaining dimensions.
"""
def __init__(self, full: bool = False):
super().__init__()
self.full = full
[docs]
def forward(self, x):
"""Run a forward pass through the flatten layer.
Parameters
----------
x : torch.Tensor
Input tensor to be flattened.
Returns
-------
torch.Tensor
Flattened tensor. If full is True, the output has shape (-1,).
Otherwise, the output has shape (batch_size, -1).
"""
return x.view(-1) if self.full else x.view(x.size(0), -1)
[docs]
def bn_drop_lin(
n_in, n_out, bn=True, p=0.0, actn=None, layer_norm=False, permute=False
):
"""Create batch normalization, dropout, linear, and activation layers.
Parameters
----------
n_in : int
Number of input features.
n_out : int
Number of output features.
bn : bool
If True, add a normalization layer before the linear layer.
p : float
Dropout probability.
actn : nn.Module
Activation function added after the linear layer.
layer_norm : bool
If True, use layer normalization instead of batch normalization.
permute : bool
If True, permute the input dimensions before and after normalization.
This is useful for inputs of shape (batch_size, sequence, features).
Returns
-------
list
List of PyTorch layers.
"""
layers = []
if permute:
layers.append(LambdaLayer(lambda x: x.permute(0, 2, 1)))
if bn:
if layer_norm is False:
layers.append(nn.BatchNorm1d(n_in))
else:
layers.append(nn.LayerNorm(n_in))
if permute:
layers.append(LambdaLayer(lambda x: x.permute(0, 2, 1)))
if p != 0:
layers.append(nn.Dropout(p))
layers.append(nn.Linear(n_in, n_out))
if actn is not None:
layers.append(actn)
return layers
[docs]
class LambdaLayer(nn.Module):
"""Layer that applies a given lambda function to the input.
Parameters
----------
lambd : callable
Function applied to the input tensor.
"""
def __init__(self, lambd):
super(LambdaLayer, self).__init__()
self.lambd = lambd
[docs]
def forward(self, x):
"""Run a forward pass through the lambda layer.
Parameters
----------
x : torch.Tensor
Input tensor.
Returns
-------
torch.Tensor
Output tensor after applying the lambda function.
"""
return self.lambd(x)
def _conv1d(
in_planes,
out_planes,
kernel_size=3,
stride=1,
dilation=1,
act="relu",
bn=True,
drop_p=0,
layer_norm=False,
):
"""Create a 1D convolutional block.
Parameters
----------
in_planes : int
Number of input channels.
out_planes : int
Number of output channels.
kernel_size : int
Size of the convolution kernel.
stride : int
Stride of the convolution.
dilation : int
Dilation factor of the convolution.
act : str
Activation function name. Supported values are "relu", "elu",
"prelu", and "gelu".
bn : bool
If True, add a normalization layer after the convolution.
drop_p : float
Dropout probability applied before the convolution.
layer_norm : bool
If True, use layer normalization instead of batch normalization.
Returns
-------
nn.Sequential
Sequential 1D convolutional block.
"""
lst = []
if drop_p > 0:
lst.append(nn.Dropout(drop_p))
lst.append(
nn.Conv1d(
in_planes,
out_planes,
kernel_size=kernel_size,
stride=stride,
padding=(kernel_size - 1) // 2,
dilation=dilation,
bias=not (bn),
)
)
if bn:
if layer_norm:
lst.append(LambdaLayer(lambda x: x.transpose(1, 2)))
lst.append(nn.LayerNorm(out_planes))
lst.append(LambdaLayer(lambda x: x.transpose(1, 2)))
else:
lst.append(nn.BatchNorm1d(out_planes))
if act == "relu":
lst.append(nn.ReLU(True))
if act == "elu":
lst.append(nn.ELU(True))
if act == "prelu":
lst.append(nn.PReLU(True))
if act == "gelu":
lst.append(nn.GELU())
return nn.Sequential(*lst)
def _fc(in_planes, out_planes, act="relu", bn=True):
"""Create a fully connected block.
Parameters
----------
in_planes : int
Number of input features.
out_planes : int
Number of output features.
act : str
Activation function name. Supported values are "relu", "elu",
and "prelu".
bn : bool
If True, add batch normalization after the linear layer.
Returns
-------
nn.Sequential
Sequential fully connected block.
"""
lst = [nn.Linear(in_planes, out_planes, bias=not (bn))]
if bn:
lst.append(nn.BatchNorm1d(out_planes))
if act == "relu":
lst.append(nn.ReLU(True))
if act == "elu":
lst.append(nn.ELU(True))
if act == "prelu":
lst.append(nn.PReLU(True))
return nn.Sequential(*lst)
[docs]
class AdaptiveConcatPool1d(nn.Module):
"""Concatenate adaptive average pooling and adaptive max pooling.
Parameters
----------
sz : int
Output size of each pooling layer. If None, the output size is 1.
"""
def __init__(self, sz=None):
"Output will be 2*sz or 2 if sz is None"
super().__init__()
sz = sz or 1
self.ap, self.mp = nn.AdaptiveAvgPool1d(sz), nn.AdaptiveMaxPool1d(sz)
[docs]
def forward(self, x):
"""Run a forward pass through the adaptive concatenated pooling layer.
Parameters
----------
x : torch.Tensor
Input tensor.
Returns
-------
torch.Tensor
Tensor obtained by concatenating max-pooled and average-pooled
features along the channel dimension.
"""
return torch.cat([self.mp(x), self.ap(x)], 1)
[docs]
class SqueezeExcite1d(nn.Module):
"""Squeeze-and-excitation block for 1D inputs.
Parameters
----------
channels : int
Number of input channels.
reduction : int
Reduction factor used to compute the number of hidden channels.
"""
def __init__(self, channels, reduction=16):
super().__init__()
channels_reduced = channels // reduction
self.w1 = torch.nn.Parameter(
torch.randn(channels_reduced, channels).unsqueeze(0)
)
self.w2 = torch.nn.Parameter(
torch.randn(channels, channels_reduced).unsqueeze(0)
)
[docs]
def forward(self, x):
"""Run a forward pass through the squeeze-and-excitation block.
Parameters
----------
x : torch.Tensor
Input tensor of shape (batch_size, channels, sequence_length).
Returns
-------
torch.Tensor
Reweighted tensor with the same shape as the input.
"""
z = torch.mean(x, dim=2, keepdim=True) # bs,ch
intermed = F.relu(torch.matmul(self.w1, z))
s = F.sigmoid(torch.matmul(self.w2, intermed))
return s * x
[docs]
def weight_init(m):
"""Initialize weights of supported model layers.
Parameters
----------
m : nn.Module
Module whose weights should be initialized.
Returns
-------
None
The function modifies the module weights in place.
"""
if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
if isinstance(m, nn.BatchNorm1d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
if isinstance(m, SqueezeExcite1d):
stdv1 = math.sqrt(2.0 / m.w1.size[0])
nn.init.normal_(m.w1, 0.0, stdv1)
stdv2 = math.sqrt(1.0 / m.w2.size[1])
nn.init.normal_(m.w2, 0.0, stdv2)
[docs]
def create_head1d(
nf, nc, lin_ftrs=None, ps=0.5, bn: bool = True, act="relu", concat_pooling=True
):
"""Create a 1D model classification or regression head.
Parameters
----------
nf : int
Number of input features.
nc : int
Number of output classes or output values.
lin_ftrs : list
List of hidden layer sizes used in the head.
ps : float or Iterable
Dropout probability or list of dropout probabilities.
bn : bool
If True, use batch normalization in the linear blocks.
act : str
Activation function used between linear layers.
concat_pooling : bool
If True, use concatenated adaptive average and max pooling.
If False, use adaptive average pooling only.
Returns
-------
nn.Sequential
Sequential model head.
"""
lin_ftrs = (
[2 * nf if concat_pooling else nf, nc]
if lin_ftrs is None
else [2 * nf if concat_pooling else nf] + lin_ftrs + [nc]
) # was [nf, 512,nc]
ps = [ps] if not isinstance(ps, Iterable) else ps
if len(ps) == 1:
ps = [ps[0] / 2] * (len(lin_ftrs) - 2) + ps
actns = [nn.ReLU(inplace=True) if act == "relu" else nn.ELU(inplace=True)] * (
len(lin_ftrs) - 2
) + [None]
layers = [
AdaptiveConcatPool1d() if concat_pooling else nn.AdaptiveAvgPool1d(1),
Flatten(),
]
for ni, no, p, actn in zip(lin_ftrs[:-1], lin_ftrs[1:], ps, actns):
layers += bn_drop_lin(ni, no, bn, p, actn)
return nn.Sequential(*layers)
[docs]
class Basic_Conv1d(nn.Sequential):
"""Basic configurable 1D convolutional neural network.
Parameters
----------
filters : list
List containing the number of filters for each convolutional layer.
kernel_size : int or list
Kernel size used for the convolutional layers.
stride : int
Stride used for the convolutional layers.
dilation : int
Dilation factor used for the convolutional layers.
pool : int
Max pooling kernel size. If 0, no max pooling is applied.
pool_stride : int
Stride used for max pooling.
squeeze_excite_reduction : int
Reduction factor for squeeze-and-excitation blocks. If 0, no
squeeze-and-excitation block is applied.
num_classes : int
Number of output classes or output values.
input_channels : int
Number of input channels.
act : str
Activation function used in the convolutional blocks.
bn : bool
If True, use batch normalization in convolutional blocks.
headless : bool
If True, remove the final prediction head and return extracted features.
split_first_layer : bool
If True, split the first convolutional layer into two convolutional steps.
drop_p : float
Dropout probability used in convolutional blocks.
lin_ftrs_head : list
Hidden layer sizes used in the model head.
ps_head : float or Iterable
Dropout probability or probabilities used in the model head.
bn_final_head : bool
Indicates whether final batch normalization should be used in the head.
bn_head : bool
If True, use batch normalization in the model head.
act_head : str
Activation function used in the model head.
concat_pooling : bool
If True, use concatenated adaptive average and max pooling in the head.
"""
def __init__(
self,
filters=[128, 128, 128, 128],
kernel_size=3,
stride=2,
dilation=1,
pool=0,
pool_stride=1,
squeeze_excite_reduction=0,
num_classes=2,
input_channels=8,
act="relu",
bn=True,
headless=False,
split_first_layer=False,
drop_p=0.0,
lin_ftrs_head=None,
ps_head=0.5,
bn_final_head=False,
bn_head=True,
act_head="relu",
concat_pooling=True,
):
layers = []
if isinstance(kernel_size, int):
kernel_size = [kernel_size] * len(filters)
for i in range(len(filters)):
layers_tmp = []
layers_tmp.append(
_conv1d(
input_channels if i == 0 else filters[i - 1],
filters[i],
kernel_size=kernel_size[i],
stride=(1 if (split_first_layer is True and i == 0) else stride),
dilation=dilation,
act=(
"none"
if (
(headless is True and i == len(filters) - 1)
or (split_first_layer is True and i == 0)
)
else act
),
bn=False if (headless is True and i == len(filters) - 1) else bn,
drop_p=(0.0 if i == 0 else drop_p),
)
)
if split_first_layer is True and i == 0:
layers_tmp.append(
_conv1d(
filters[0],
filters[0],
kernel_size=1,
stride=1,
act=act,
bn=bn,
drop_p=0.0,
)
)
# layers_tmp.append(nn.Linear(filters[0],filters[0],bias=not(bn)))
# layers_tmp.append(_fc(filters[0],filters[0],act=act,bn=bn))
if pool > 0 and i < len(filters) - 1:
layers_tmp.append(
nn.MaxPool1d(pool, stride=pool_stride, padding=(pool - 1) // 2)
)
if squeeze_excite_reduction > 0:
layers_tmp.append(SqueezeExcite1d(filters[i], squeeze_excite_reduction))
layers.append(nn.Sequential(*layers_tmp))
# head
# layers.append(nn.AdaptiveAvgPool1d(1))
# layers.append(nn.Linear(filters[-1],num_classes))
# head #inplace=True leads to a runtime error see ReLU+ dropout https://discuss.pytorch.org/t/relu-dropout-inplace/13467/5
self.headless = headless
if headless is True:
head = nn.Sequential(nn.AdaptiveAvgPool1d(1), Flatten())
else:
head = create_head1d(
filters[-1],
nc=num_classes,
lin_ftrs=lin_ftrs_head,
ps=ps_head,
bn_final=bn_final_head,
bn=bn_head,
act=act_head,
concat_pooling=concat_pooling,
)
layers.append(head)
super().__init__(*layers)
[docs]
def get_layer_groups(self):
"""Get layer groups of the model.
Returns
-------
tuple
Tuple containing selected convolutional layers and the model head.
"""
return (self[2], self[-1])
[docs]
def get_output_layer(self):
"""Get the output layer of the model.
Returns
-------
nn.Module or None
Output layer if the model is not headless. Otherwise, None.
"""
if self.headless is False:
return self[-1][-1]
else:
return None
[docs]
def set_output_layer(self, x):
"""Set the output layer of the model.
Parameters
----------
x : nn.Module
New output layer.
Returns
-------
None
The function modifies the output layer in place.
"""
if self.headless is False:
self[-1][-1] = x
[docs]
def fcn(filters=[128] * 5, num_classes=2, input_channels=8, **kwargs):
"""Create a fully convolutional 1D network.
Parameters
----------
filters : list
List containing the number of filters for the convolutional layers.
num_classes : int
Number of output classes or output values.
input_channels : int
Number of input channels.
**kwargs
Additional keyword arguments.
Returns
-------
Basic_Conv1d
Fully convolutional 1D model.
"""
filters_in = filters + [num_classes]
return Basic_Conv1d(
filters=filters_in,
kernel_size=3,
stride=1,
pool=2,
pool_stride=2,
input_channels=input_channels,
act="relu",
bn=True,
headless=True,
)
[docs]
def fcn_wang(
num_classes=2,
input_channels=8,
lin_ftrs_head=None,
ps_head=0.5,
bn_final_head=False,
bn_head=True,
act_head="relu",
concat_pooling=True,
**kwargs,
):
"""Create a Wang-style fully convolutional 1D network.
Parameters
----------
num_classes : int
Number of output classes or output values.
input_channels : int
Number of input channels.
lin_ftrs_head : list
Hidden layer sizes used in the model head.
ps_head : float or Iterable
Dropout probability or probabilities used in the model head.
bn_final_head : bool
Indicates whether final batch normalization should be used in the head.
bn_head : bool
If True, use batch normalization in the model head.
act_head : str
Activation function used in the model head.
concat_pooling : bool
If True, use concatenated adaptive average and max pooling in the head.
**kwargs
Additional keyword arguments.
Returns
-------
Basic_Conv1d
Wang-style fully convolutional 1D model.
"""
return Basic_Conv1d(
filters=[128, 256, 128],
kernel_size=[8, 5, 3],
stride=1,
pool=0,
pool_stride=2,
num_classes=num_classes,
input_channels=input_channels,
act="relu",
bn=True,
lin_ftrs_head=lin_ftrs_head,
ps_head=ps_head,
bn_final_head=bn_final_head,
bn_head=bn_head,
act_head=act_head,
concat_pooling=concat_pooling,
)
[docs]
def schirrmeister(
num_classes=2,
input_channels=8,
kernel_size=10,
lin_ftrs_head=None,
ps_head=0.5,
bn_final_head=False,
bn_head=True,
act_head="relu",
concat_pooling=True,
**kwargs,
):
"""Create a Schirrmeister-style 1D convolutional network.
Parameters
----------
num_classes : int
Number of output classes or output values.
input_channels : int
Number of input channels.
kernel_size : int
Kernel size used in the convolutional layers.
lin_ftrs_head : list
Hidden layer sizes used in the model head.
ps_head : float or Iterable
Dropout probability or probabilities used in the model head.
bn_final_head : bool
Indicates whether final batch normalization should be used in the head.
bn_head : bool
If True, use batch normalization in the model head.
act_head : str
Activation function used in the model head.
concat_pooling : bool
If True, use concatenated adaptive average and max pooling in the head.
**kwargs
Additional keyword arguments.
Returns
-------
Basic_Conv1d
Schirrmeister-style 1D convolutional model.
"""
return Basic_Conv1d(
filters=[25, 50, 100, 200],
kernel_size=kernel_size,
stride=3,
pool=3,
pool_stride=1,
num_classes=num_classes,
input_channels=input_channels,
act="relu",
bn=True,
headless=False,
split_first_layer=True,
drop_p=0.5,
lin_ftrs_head=lin_ftrs_head,
ps_head=ps_head,
bn_final_head=bn_final_head,
bn_head=bn_head,
act_head=act_head,
concat_pooling=concat_pooling,
)
[docs]
def sen(
filters=[128] * 5,
num_classes=2,
input_channels=8,
kernel_size=3,
squeeze_excite_reduction=16,
drop_p=0.0,
lin_ftrs_head=None,
ps_head=0.5,
bn_final_head=False,
bn_head=True,
act_head="relu",
concat_pooling=True,
**kwargs,
):
"""Create a squeeze-and-excitation 1D convolutional network.
Parameters
----------
filters : list
List containing the number of filters for each convolutional layer.
num_classes : int
Number of output classes or output values.
input_channels : int
Number of input channels.
kernel_size : int or list
Kernel size used in the convolutional layers.
squeeze_excite_reduction : int
Reduction factor used in squeeze-and-excitation blocks.
drop_p : float
Dropout probability used in convolutional blocks.
lin_ftrs_head : list
Hidden layer sizes used in the model head.
ps_head : float or Iterable
Dropout probability or probabilities used in the model head.
bn_final_head : bool
Indicates whether final batch normalization should be used in the head.
bn_head : bool
If True, use batch normalization in the model head.
act_head : str
Activation function used in the model head.
concat_pooling : bool
If True, use concatenated adaptive average and max pooling in the head.
**kwargs
Additional keyword arguments.
Returns
-------
Basic_Conv1d
Squeeze-and-excitation 1D convolutional model.
"""
return Basic_Conv1d(
filters=filters,
kernel_size=kernel_size,
stride=2,
pool=0,
pool_stride=0,
input_channels=input_channels,
act="relu",
bn=True,
num_classes=num_classes,
squeeze_excite_reduction=squeeze_excite_reduction,
drop_p=drop_p,
lin_ftrs_head=lin_ftrs_head,
ps_head=ps_head,
bn_final_head=bn_final_head,
bn_head=bn_head,
act_head=act_head,
concat_pooling=concat_pooling,
)
[docs]
def basic1d(
filters=[128] * 5,
kernel_size=3,
stride=2,
dilation=1,
pool=0,
pool_stride=1,
squeeze_excite_reduction=0,
num_classes=2,
input_channels=8,
act="relu",
bn=True,
headless=False,
drop_p=0.0,
lin_ftrs_head=None,
ps_head=0.5,
bn_final_head=False,
bn_head=True,
act_head="relu",
concat_pooling=True,
**kwargs,
):
"""Create a basic configurable 1D convolutional network.
Parameters
----------
filters : list
List containing the number of filters for each convolutional layer.
kernel_size : int or list
Kernel size used for the convolutional layers.
stride : int
Stride used for the convolutional layers.
dilation : int
Dilation factor used for the convolutional layers.
pool : int
Max pooling kernel size. If 0, no max pooling is applied.
pool_stride : int
Stride used for max pooling.
squeeze_excite_reduction : int
Reduction factor for squeeze-and-excitation blocks. If 0, no
squeeze-and-excitation block is applied.
num_classes : int
Number of output classes or output values.
input_channels : int
Number of input channels.
act : str
Activation function used in the convolutional blocks.
bn : bool
If True, use batch normalization in convolutional blocks.
headless : bool
If True, remove the final prediction head and return extracted features.
drop_p : float
Dropout probability used in convolutional blocks.
lin_ftrs_head : list
Hidden layer sizes used in the model head.
ps_head : float or Iterable
Dropout probability or probabilities used in the model head.
bn_final_head : bool
Indicates whether final batch normalization should be used in the head.
bn_head : bool
If True, use batch normalization in the model head.
act_head : str
Activation function used in the model head.
concat_pooling : bool
If True, use concatenated adaptive average and max pooling in the head.
**kwargs
Additional keyword arguments.
Returns
-------
Basic_Conv1d
Basic configurable 1D convolutional model.
"""
return Basic_Conv1d(
filters=filters,
kernel_size=kernel_size,
stride=stride,
dilation=dilation,
pool=pool,
pool_stride=pool_stride,
squeeze_excite_reduction=squeeze_excite_reduction,
num_classes=num_classes,
input_channels=input_channels,
act=act,
bn=bn,
headless=headless,
drop_p=drop_p,
lin_ftrs_head=lin_ftrs_head,
ps_head=ps_head,
bn_final_head=bn_final_head,
bn_head=bn_head,
act_head=act_head,
concat_pooling=concat_pooling,
)