Source code for qumphy.models.xresnet1d

"""
File: qumphy/models/xresnet1d.py
Project: 22HLT01 QUMPHY
Contact: oskar.pfeffer@ptb.de
Gitlab: https://gitlab.com/qumphy
Description: One-dimensional XResNet and XBotNet model architectures.
"""

import torch
import torch.nn as nn
import math
import qumphy
from .basic_conv1d import create_head1d

from enum import Enum

NormType = Enum("NormType", "Batch BatchZero")


[docs] def init_default(m, func=nn.init.kaiming_normal_): """Initialize module weights and bias. Parameters ---------- m : nn.Module Module whose weights and bias should be initialized. func : callable Initialization function applied to the module weights. Returns ------- nn.Module Initialized module. """ if func and hasattr(m, "weight"): func(m.weight) with torch.no_grad(): if getattr(m, "bias", None) is not None: m.bias.fill_(0.0) return m
def _get_norm(prefix, nf, zero=False, **kwargs): """Create a normalization layer. Parameters ---------- prefix : str Name prefix of the normalization layer. nf : int Number of input features. zero : bool If True, initialize the normalization weights with zeros. **kwargs Additional keyword arguments passed to the normalization layer. Returns ------- nn.Module Initialized normalization layer. """ bn = getattr(nn, f"{prefix}1d")(nf, **kwargs) if bn.affine: bn.bias.data.fill_(1e-3) bn.weight.data.fill_(0.0 if zero else 1.0) return bn
[docs] def BatchNorm(nf, norm_type=NormType.Batch, **kwargs): """Create a batch normalization layer. Parameters ---------- nf : int Number of input features. norm_type : NormType Type of batch normalization initialization. **kwargs Additional keyword arguments passed to the batch normalization layer. Returns ------- nn.Module Initialized batch normalization layer. """ return _get_norm("BatchNorm", nf, zero=norm_type == NormType.BatchZero, **kwargs)
[docs] class ConvLayer(nn.Sequential): """One-dimensional convolutional layer with optional activation and normalization.""" def __init__( self, ni, nf, ks=3, stride=1, padding=None, bias=None, norm_type=NormType.Batch, bn_1st=True, act_cls=nn.ReLU, init=nn.init.kaiming_normal_, xtra=None, **kwargs, ): """Initialize the convolutional layer. Parameters ---------- ni : int Number of input channels. nf : int Number of output channels. ks : int Kernel size of the convolution. stride : int Stride of the convolution. padding : int Padding applied to the convolution. If None, same-length padding is used. bias : bool If True, use a bias term in the convolution. norm_type : NormType Type of normalization to use. bn_1st : bool If True, place batch normalization before the activation. act_cls : type Activation class used after the convolution. init : callable Initialization function applied to convolution weights. xtra : nn.Module Optional extra layer appended at the end. **kwargs Additional keyword arguments passed to the convolution layer. """ if padding is None: padding = (ks - 1) // 2 bn = norm_type in (NormType.Batch, NormType.BatchZero) if bias is None: bias = not (bn) conv_func = nn.Conv1d conv = init_default( conv_func( ni, nf, kernel_size=ks, bias=bias, stride=stride, padding=padding, **kwargs, ), init, ) layers = [conv] act_bn = [] if act_cls is not None: act_bn.append(act_cls()) if bn: act_bn.append(BatchNorm(nf, norm_type=norm_type)) if bn_1st: act_bn.reverse() layers += act_bn if xtra: layers.append(xtra) super().__init__(*layers)
[docs] class MHSA1d(nn.Module): """Multi-head self-attention block for one-dimensional inputs.""" def __init__(self, n_dims, length=14, heads=4): """Initialize the 1D multi-head self-attention block. Parameters ---------- n_dims : int Number of input channels or feature dimensions. length : int Input sequence length used for the relative positional embedding. heads : int Number of attention heads. """ super(MHSA1d, self).__init__() self.heads = heads self.query = nn.Conv1d(n_dims, n_dims, kernel_size=1) self.key = nn.Conv1d(n_dims, n_dims, kernel_size=1) self.value = nn.Conv1d(n_dims, n_dims, kernel_size=1) self.rel = nn.Parameter( torch.randn([1, heads, n_dims // heads, length]), requires_grad=True ) self.softmax = nn.Softmax(dim=-1)
[docs] def forward(self, x): """Run a forward pass through the attention block. Parameters ---------- x : torch.Tensor Input tensor of shape (batch_size, channels, sequence_length). Returns ------- torch.Tensor Output tensor with the same shape as the input. """ n_batch, C, length = x.size() q = self.query(x).view(n_batch, self.heads, C // self.heads, -1) k = self.key(x).view(n_batch, self.heads, C // self.heads, -1) v = self.value(x).view(n_batch, self.heads, C // self.heads, -1) content_content = torch.matmul(q.permute(0, 1, 3, 2), k) content_position = ( (self.rel).view(1, self.heads, C // self.heads, -1).permute(0, 1, 3, 2) ) content_position = torch.matmul(content_position, q) energy = content_content + content_position attention = self.softmax(energy) out = torch.matmul(v, attention.permute(0, 1, 3, 2)) out = out.view(n_batch, C, length) return out
[docs] class ResBlock(nn.Module): """Residual block for one-dimensional XResNet models.""" def __init__( self, expansion, ni, nf, stride=1, kernel_size=3, groups=1, nh1=None, nh2=None, dw=False, g2=1, norm_type=NormType.Batch, act_cls=nn.ReLU, pool=nn.AvgPool1d, pool_first=True, heads=4, mhsa=False, input_size=None, **kwargs, ): """Initialize the residual block. Parameters ---------- expansion : int Expansion factor for the block channels. ni : int Number of input channels before expansion. nf : int Number of output channels before expansion. stride : int Stride used in the main convolutional path. kernel_size : int Kernel size used in convolutional layers. groups : int Number of convolution groups. nh1 : int Number of hidden channels in the first convolution. nh2 : int Number of hidden channels in the second convolution. dw : bool If True, use depthwise convolution. g2 : int Number of groups used in the final convolution. norm_type : NormType Type of normalization used in convolutional layers. act_cls : type Activation class used in the block. pool : type Pooling layer class used in the identity path when downsampling. pool_first : bool If True, apply pooling before the identity convolution. heads : int Number of attention heads used when mhsa is True. mhsa : bool If True, replace the middle convolution with multi-head self-attention. input_size : int Input sequence length used by the attention block. **kwargs Additional keyword arguments passed to ConvLayer. """ super().__init__() assert mhsa is False or expansion > 1 norm2 = NormType.BatchZero if norm_type == NormType.Batch else norm_type if nh2 is None: nh2 = nf if nh1 is None: nh1 = nh2 nf, ni = nf * expansion, ni * expansion k0 = dict(norm_type=norm_type, act_cls=act_cls, **kwargs) k1 = dict(norm_type=norm2, act_cls=None, **kwargs) if expansion == 1: layers = [ ConvLayer( ni, nh2, kernel_size, stride=stride, groups=ni if dw else groups, **k0, ), ConvLayer(nh2, nf, kernel_size, groups=g2, **k1), ] else: layers = [ConvLayer(ni, nh1, 1, **k0)] if not mhsa: layers.append( ConvLayer( nh1, nh2, kernel_size, stride=stride, groups=nh1 if dw else groups, **k0, ) ) else: assert nh1 == nh2 layers.append(MHSA1d(nh1, length=int(input_size), heads=heads)) if stride == 2: layers.append(nn.AvgPool1d(2, 2)) layers.append(ConvLayer(nh2, nf, 1, groups=g2, **k1)) self.convs = nn.Sequential(*layers) convpath = [self.convs] self.convpath = nn.Sequential(*convpath) idpath = [] if ni != nf: idpath.append(ConvLayer(ni, nf, 1, act_cls=None, **kwargs)) if stride != 1: idpath.insert((1, 0)[pool_first], pool(2, ceil_mode=True)) self.idpath = nn.Sequential(*idpath) self.act = nn.ReLU(inplace=True) if act_cls is nn.ReLU else act_cls()
[docs] def forward(self, x): """Run a forward pass through the residual block. Parameters ---------- x : torch.Tensor Input tensor. Returns ------- torch.Tensor Output tensor after the residual connection and activation. """ return self.act(self.convpath(x) + self.idpath(x))
[docs] class ResBlock_dropout(nn.Module): """Residual block with Monte Carlo dropout for one-dimensional XResNet models.""" def __init__( self, expansion, ni, nf, stride=1, kernel_size=3, groups=1, nh1=None, nh2=None, dw=False, g2=1, norm_type=NormType.Batch, act_cls=nn.ReLU, pool=nn.AvgPool1d, pool_first=True, heads=4, mhsa=False, input_size=None, dropout_rate=0.5, **kwargs, ): """Initialize the dropout residual block. Parameters ---------- expansion : int Expansion factor for the block channels. ni : int Number of input channels before expansion. nf : int Number of output channels before expansion. stride : int Stride used in the main convolutional path. kernel_size : int Kernel size used in convolutional layers. groups : int Number of convolution groups. nh1 : int Number of hidden channels in the first convolution. nh2 : int Number of hidden channels in the second convolution. dw : bool If True, use depthwise convolution. g2 : int Number of groups used in the final convolution. norm_type : NormType Type of normalization used in convolutional layers. act_cls : type Activation class used in the block. pool : type Pooling layer class used in the identity path when downsampling. pool_first : bool If True, apply pooling before the identity convolution. heads : int Number of attention heads used when mhsa is True. mhsa : bool If True, replace the middle convolution with multi-head self-attention. input_size : int Input sequence length used by the attention block. dropout_rate : float Dropout probability used in Monte Carlo dropout layers. **kwargs Additional keyword arguments passed to ConvLayer. """ super().__init__() assert mhsa is False or expansion > 1 norm2 = NormType.BatchZero if norm_type == NormType.Batch else norm_type if nh2 is None: nh2 = nf if nh1 is None: nh1 = nh2 nf, ni = nf * expansion, ni * expansion k0 = dict(norm_type=norm_type, act_cls=act_cls, **kwargs) k1 = dict(norm_type=norm2, act_cls=None, **kwargs) if expansion == 1: layers = [ ConvLayer( ni, nh2, kernel_size, stride=stride, groups=ni if dw else groups, **k0, ), ConvLayer(nh2, nf, kernel_size, groups=g2, **k1), ] else: layers = [ConvLayer(ni, nh1, 1, **k0)] if mhsa is False: layers.append( qumphy.models.utils.mcdropout.MCDropout( dropout_rate, mcdropout=True ) ) layers.append( ConvLayer( nh1, nh2, kernel_size, stride=stride, groups=nh1 if dw else groups, **k0, ) ) else: assert nh1 == nh2 layers.append(MHSA1d(nh1, length=int(input_size), heads=heads)) if stride == 2: layers.append(nn.AvgPool1d(2, 2)) layers.append( qumphy.models.utils.mcdropout.MCDropout(dropout_rate, mcdropout=True) ) layers.append(ConvLayer(nh2, nf, 1, groups=g2, **k1)) self.convs = nn.Sequential(*layers) convpath = [self.convs] self.convpath = nn.Sequential(*convpath) idpath = [] if ni != nf: idpath.append(ConvLayer(ni, nf, 1, act_cls=None, **kwargs)) if stride != 1: idpath.insert((1, 0)[pool_first], pool(2, ceil_mode=True)) self.idpath = nn.Sequential(*idpath) self.act = nn.ReLU(inplace=True) if act_cls is nn.ReLU else act_cls() self.dropout_layer = qumphy.models.utils.mcdropout.MCDropout( dropout_rate, mcdropout=True )
[docs] def forward(self, x): """Run a forward pass through the dropout residual block. Parameters ---------- x : torch.Tensor Input tensor. Returns ------- torch.Tensor Output tensor after the residual connection and activation. """ return self.act(self.convpath(x) + self.idpath(x))
[docs] def init_cnn(m): """Initialize CNN module weights recursively. Parameters ---------- m : nn.Module Module whose children should be initialized. Returns ------- None The function modifies the module weights in place. """ if getattr(m, "bias", None) is not None: nn.init.constant_(m.bias, 0) if isinstance(m, (nn.Conv1d, nn.Conv2d, nn.Linear)): nn.init.kaiming_normal_(m.weight) for layer in m.children(): init_cnn(layer)
[docs] class XResNet1d(nn.Sequential): """Configurable one-dimensional XResNet model.""" def __init__( self, block, expansion, layers, input_channels=3, num_classes=1000, stem_szs=(32, 32, 64), input_size=1000, heads=4, mhsa=False, kernel_size=5, kernel_size_stem=5, widen=1.0, act_cls=nn.ReLU, lin_ftrs_head=None, ps_head=0.5, bn_head=True, act_head="relu", concat_pooling=True, model_dim=None, **kwargs, ): """Initialize the 1D XResNet model. Parameters ---------- block : nn.Module Residual block class used to build the network. expansion : int Expansion factor used in residual blocks. layers : list Number of residual blocks in each stage. input_channels : int Number of input channels. num_classes : int Number of output classes or output values. If None, no head is added. stem_szs : tuple Output channel sizes of the three stem convolutional layers. input_size : int Length of the input sequence. heads : int Number of attention heads used when mhsa is True. mhsa : bool If True, use multi-head self-attention in the final residual stage. kernel_size : int Kernel size used in residual blocks. kernel_size_stem : int Kernel size used in the stem convolutional layers. widen : float Width multiplier applied to the residual block channels. act_cls : type Activation class used in convolutional and residual blocks. lin_ftrs_head : list Hidden layer sizes used in the model head. ps_head : float or Iterable Dropout probability or probabilities used in the model head. bn_head : bool If True, use batch normalization in the model head. act_head : str Activation function used in the model head. concat_pooling : bool If True, use concatenated adaptive average and max pooling in the head. model_dim : int Model dimension used to define residual block widths. **kwargs Additional keyword arguments passed to residual blocks. """ self.block = block self.expansion = expansion self.act_cls = act_cls stem_szs = [input_channels, *stem_szs] stem = [ ConvLayer( stem_szs[i], stem_szs[i + 1], ks=kernel_size_stem, stride=2 if i == 0 else 1, act_cls=act_cls, ) for i in range(3) ] stem.append(nn.MaxPool1d(kernel_size=3, stride=2, padding=1)) if input_size is not None: self.input_size = math.floor((input_size - 1) / 2 + 1) self.input_size = math.floor((self.input_size - 1) / 2 + 1) if model_dim is None: block_szs = [ int(o * widen) for o in [64, 64, 64, 64] + [64] * (len(layers) - 4) ] else: model_dim = int(model_dim / widen / expansion) block_szs = [ int(o * widen) for o in [model_dim, model_dim, model_dim, model_dim] + [model_dim] * (len(layers) - 4) ] block_szs = [64 // expansion] + block_szs blocks = [ self._make_layer( ni=block_szs[i], nf=block_szs[i + 1], blocks=layer, stride=1 if i == 0 else (1 if i == len(layers) - 1 and mhsa else 2), kernel_size=kernel_size, heads=heads, mhsa=mhsa if i == len(layers) - 1 else False, **kwargs, ) for i, layer in enumerate(layers) ] if num_classes is None: super().__init__(*stem, *blocks) else: head = create_head1d( block_szs[-1] * expansion, nc=num_classes, lin_ftrs=lin_ftrs_head, ps=ps_head, bn=bn_head, act=act_head, concat_pooling=concat_pooling, ) super().__init__( *stem, *blocks, head, ) init_cnn(self) def _make_layer( self, ni, nf, blocks, stride, kernel_size, heads=4, mhsa=False, **kwargs ): """Create one residual stage. Parameters ---------- ni : int Number of input channels. nf : int Number of output channels. blocks : int Number of residual blocks in the stage. stride : int Stride of the first residual block. kernel_size : int Kernel size used in residual blocks. heads : int Number of attention heads used when mhsa is True. mhsa : bool If True, use multi-head self-attention in the stage. **kwargs Additional keyword arguments passed to the residual block. Returns ------- nn.Sequential Sequential residual stage. """ input_size0 = self.input_size input_size1 = ( math.floor((self.input_size - 1) / stride + 1) if self.input_size is not None else None ) self.input_size = input_size1 return nn.Sequential( *[ self.block( self.expansion, ni if i == 0 else nf, nf, stride=stride if i == 0 else 1, kernel_size=kernel_size, act_cls=self.act_cls, heads=heads, mhsa=mhsa, input_size=input_size0 if i == 0 else input_size1, **kwargs, ) for i in range(blocks) ] )
[docs] def get_layer_groups(self): """Get layer groups of the model. Returns ------- tuple Tuple containing selected feature layers and the model head. """ return (self[3], self[-1])
[docs] def get_output_layer(self): """Get the output layer of the model. Returns ------- nn.Module Final output layer of the model head. """ return self[-1][-1]
[docs] def set_output_layer(self, x): """Set the output layer of the model. Parameters ---------- x : nn.Module New output layer. Returns ------- None The function modifies the output layer in place. """ self[-1][-1] = x
def _xresnet1d(expansion, layers, **kwargs): """Create a 1D XResNet model. Parameters ---------- expansion : int Expansion factor used in residual blocks. layers : list Number of residual blocks in each stage. **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized 1D XResNet model. """ return XResNet1d(ResBlock, expansion, layers, **kwargs)
[docs] def xresnet1d18(**kwargs): """Create an XResNet1d-18 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XResNet1d-18 model. """ return _xresnet1d(1, [2, 2, 2, 2], **kwargs)
[docs] def xresnet1d34(**kwargs): """Create an XResNet1d-34 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XResNet1d-34 model. """ return _xresnet1d(1, [3, 4, 6, 3], **kwargs)
[docs] def xresnet1d50(**kwargs): """Create an XResNet1d-50 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XResNet1d-50 model. """ return _xresnet1d(4, [3, 4, 6, 3], **kwargs)
[docs] class XResNet1d50(XResNet1d): """XResNet1d-50 model.""" def __init__(self, **kwargs): """Initialize the XResNet1d-50 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. """ super().__init__(ResBlock, 4, [3, 4, 6, 3], **kwargs)
[docs] class XResNet1d101(XResNet1d): """XResNet1d-101 model.""" def __init__(self, **kwargs): """Initialize the XResNet1d-101 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. """ super().__init__(ResBlock, 4, [3, 4, 23, 3], **kwargs)
[docs] def xresnet1d101(**kwargs): """Create an XResNet1d-101 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XResNet1d-101 model. """ return _xresnet1d(4, [3, 4, 23, 3], **kwargs)
[docs] def xresnet1d152(**kwargs): """Create an XResNet1d-152 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XResNet1d-152 model. """ return _xresnet1d(4, [3, 8, 36, 3], **kwargs)
[docs] def xresnet1d18_deep(**kwargs): """Create a deep XResNet1d-18 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized deep XResNet1d-18 model. """ return _xresnet1d(1, [2, 2, 2, 2, 1, 1], **kwargs)
[docs] def xresnet1d34_deep(**kwargs): """Create a deep XResNet1d-34 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized deep XResNet1d-34 model. """ return _xresnet1d(1, [3, 4, 6, 3, 1, 1], **kwargs)
[docs] def xresnet1d50_deep(**kwargs): """Create a deep XResNet1d-50 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized deep XResNet1d-50 model. """ return _xresnet1d(4, [3, 4, 6, 3, 1, 1], **kwargs)
[docs] def xresnet1d18_deeper(**kwargs): """Create a deeper XResNet1d-18 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized deeper XResNet1d-18 model. """ return _xresnet1d(1, [2, 2, 1, 1, 1, 1, 1, 1], **kwargs)
[docs] def xresnet1d34_deeper(**kwargs): """Create a deeper XResNet1d-34 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized deeper XResNet1d-34 model. """ return _xresnet1d(1, [3, 4, 6, 3, 1, 1, 1, 1], **kwargs)
[docs] def xresnet1d50_deeper(**kwargs): """Create a deeper XResNet1d-50 model. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized deeper XResNet1d-50 model. """ return _xresnet1d(4, [3, 4, 6, 3, 1, 1, 1, 1], **kwargs)
[docs] def xbotnet1d50(**kwargs): """Create an XBotNet1d-50 model with multi-head self-attention. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XBotNet1d-50 model. """ return _xresnet1d(4, [3, 4, 6, 3], mhsa=True, **kwargs)
[docs] def xbotnet1d101(**kwargs): """Create an XBotNet1d-101 model with multi-head self-attention. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XBotNet1d-101 model. """ return _xresnet1d(4, [3, 4, 23, 3], mhsa=True, **kwargs)
[docs] def xbotnet1d152(**kwargs): """Create an XBotNet1d-152 model with multi-head self-attention. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XBotNet1d-152 model. """ return _xresnet1d(4, [3, 8, 36, 3], mhsa=True, **kwargs)
def _xresnet1d_MCD(expansion, layers, dropout_rate=0.5, **kwargs): """Create a 1D XResNet model with Monte Carlo dropout blocks. Parameters ---------- expansion : int Expansion factor used in residual blocks. layers : list Number of residual blocks in each stage. dropout_rate : float Dropout probability used in Monte Carlo dropout layers. **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized 1D XResNet model with Monte Carlo dropout. """ return XResNet1d( ResBlock_dropout, expansion, layers, dropout_rate=dropout_rate, **kwargs )
[docs] def xresnet1d50_MCD(**kwargs): """Create an XResNet1d-50 model with Monte Carlo dropout. Parameters ---------- **kwargs Additional keyword arguments passed to XResNet1d. Returns ------- XResNet1d Initialized XResNet1d-50 model with Monte Carlo dropout. """ return _xresnet1d_MCD(4, [3, 4, 6, 3], **kwargs)