Source code for towhee.models.mdmmt.bert_mmt

# Built on top of the original implementation at https://github.com/papermsucode/mdmmt
#
# Modifications by Copyright 2022 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Logic for the Transformer architecture used for MMT.

"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import logging
import math
import torch

from torch import nn
from towhee.models.layers.activations import swish, gelu

logger = logging.getLogger(__name__)

ACT2FN = {"gelu": gelu, "relu": torch.nn.functional.relu, "swish": swish}

BertLayerNorm = torch.nn.LayerNorm


[docs]class BertEmbeddings(nn.Module): """Construct the embeddings from word, position and token_type embeddings."""
[docs] def __init__(self, config): super().__init__() self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size) self.layer_norm = BertLayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob)
[docs] def forward(self, input_ids, token_type_ids=None, position_ids=None, features=None): if token_type_ids is None: token_type_ids = torch.zeros_like(input_ids) token_type_embeddings = self.token_type_embeddings(token_type_ids) if position_ids is not None: position_embeddings = self.position_embeddings(position_ids) embeddings = position_embeddings + token_type_embeddings + features else: embeddings = token_type_embeddings + features embeddings = self.layer_norm(embeddings) embeddings = self.dropout(embeddings) return embeddings
[docs]class BertSelfAttention(nn.Module): """Self-attention mechanism."""
[docs] def __init__(self, config): super().__init__() if config.hidden_size % config.num_attention_heads != 0: raise ValueError( f"The hidden size {config.hidden_size} is not a multiple of the number of attention " f"heads {config.num_attention_heads}") self.output_attentions = False self.num_attention_heads = config.num_attention_heads self.attention_head_size = int(config.hidden_size / config.num_attention_heads) self.all_head_size = self.num_attention_heads * self.attention_head_size self.query = nn.Linear(config.hidden_size, self.all_head_size) self.key = nn.Linear(config.hidden_size, self.all_head_size) self.value = nn.Linear(config.hidden_size, self.all_head_size) self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
def transpose_for_scores(self, x): new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) x = x.view(*new_x_shape) return x.permute(0, 2, 1, 3)
[docs] def forward(self, hidden_states, attention_mask, head_mask=None): mixed_query_layer = self.query(hidden_states) mixed_key_layer = self.key(hidden_states) mixed_value_layer = self.value(hidden_states) query_layer = self.transpose_for_scores(mixed_query_layer) key_layer = self.transpose_for_scores(mixed_key_layer) value_layer = self.transpose_for_scores(mixed_value_layer) # Take the dot product between "query" and "key" to get the raw attention # scores. attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) attention_scores = attention_scores / math.sqrt(self.attention_head_size) # Apply the attention mask is (precomputed for all layers in BertModel # forward() function) attention_scores = attention_scores + attention_mask # Normalize the attention scores to probabilities. attention_probs = nn.Softmax(dim=-1)(attention_scores) # This is actually dropping out entire tokens to attend to, which might # seem a bit unusual, but is taken from the original Transformer paper. attention_probs = self.dropout(attention_probs) # Mask heads if we want to if head_mask is not None: attention_probs = attention_probs * head_mask context_layer = torch.matmul(attention_probs, value_layer) context_layer = context_layer.permute(0, 2, 1, 3).contiguous() new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) context_layer = context_layer.view(*new_context_layer_shape) outputs = (context_layer, attention_probs) if self.output_attentions else (context_layer,) return outputs
[docs]class BertSelfOutput(nn.Module): """Self-attention output."""
[docs] def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.layer_norm = BertLayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob)
[docs] def forward(self, hidden_states, input_tensor): hidden_states = self.dense(hidden_states) hidden_states = self.dropout(hidden_states) hidden_states = self.layer_norm(hidden_states + input_tensor) return hidden_states
[docs]class BertAttention(nn.Module): """Self-attention layer."""
[docs] def __init__(self, config): super().__init__() self.self = BertSelfAttention(config) self.output = BertSelfOutput(config)
[docs] def forward(self, input_tensor, attention_mask, head_mask=None): self_outputs = self.self(input_tensor, attention_mask, head_mask) attention_output = self.output(self_outputs[0], input_tensor) outputs = (attention_output, ) + self_outputs[1:] # add attentions if we output them return outputs
[docs]class BertIntermediate(nn.Module): """Fully-connected layer, part 1."""
[docs] def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.intermediate_size) self.intermediate_act_fn = ACT2FN[config.hidden_act]
# self.intermediate_act_fn = config.hidden_act
[docs] def forward(self, hidden_states): hidden_states = self.dense(hidden_states) hidden_states = self.intermediate_act_fn(hidden_states) return hidden_states
[docs]class BertOutput(nn.Module): """Fully-connected layer, part 2."""
[docs] def __init__(self, config): super().__init__() self.dense = nn.Linear(config.intermediate_size, config.hidden_size) self.layer_norm = BertLayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob)
[docs] def forward(self, hidden_states, input_tensor): hidden_states = self.dense(hidden_states) hidden_states = self.dropout(hidden_states) hidden_states = self.layer_norm(hidden_states + input_tensor) return hidden_states
[docs]class BertLayer(nn.Module): """Complete Bert layer."""
[docs] def __init__(self, config): super().__init__() self.attention = BertAttention(config) self.intermediate = BertIntermediate(config) self.output = BertOutput(config)
[docs] def forward(self, hidden_states, attention_mask, head_mask=None): attention_outputs = self.attention(hidden_states, attention_mask, head_mask) attention_output = attention_outputs[0] intermediate_output = self.intermediate(attention_output) layer_output = self.output(intermediate_output, attention_output) outputs = (layer_output, ) + attention_outputs[1:] # add attentions if we output them return outputs
[docs]class BertEncoder(nn.Module): """Complete Bert Model (Transformer encoder)."""
[docs] def __init__(self, config): super().__init__() self.output_attentions = False self.output_hidden_states = False self.layer = nn.ModuleList( [BertLayer(config) for _ in range(config.num_hidden_layers)])
[docs] def forward(self, hidden_states, attention_mask, head_mask=None): all_hidden_states = () all_attentions = () for i, layer_module in enumerate(self.layer): if self.output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) layer_outputs = layer_module(hidden_states, attention_mask, head_mask[i]) hidden_states = layer_outputs[0] if self.output_attentions: all_attentions = all_attentions + (layer_outputs[1],) # Add last layer if self.output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) outputs = (hidden_states,) if self.output_hidden_states: outputs = outputs + (all_hidden_states,) if self.output_attentions: outputs = outputs + (all_attentions,) # last-layer hidden state, (all hidden states), (all attentions) return outputs
[docs]class BertPooler(nn.Module): """Extraction of a single output embedding."""
[docs] def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.activation = nn.Tanh()
[docs] def forward(self, hidden_states): # We "pool" the model by simply taking the hidden state corresponding # to the first token. first_token_tensor = hidden_states[:, 0] pooled_output = self.dense(first_token_tensor) pooled_output = self.activation(pooled_output) return pooled_output
[docs]class BertMMT(nn.Module): r"""Bert Model. Outputs: `Tuple` comprising various elements depending on the configuration (config) and inputs: **last_hidden_state**: ``torch.FloatTensor`` of shape ``(batch_size, sequence_length, hidden_size)`` Sequence of hidden-states at the output of the last layer of the model. **pooler_output**: ``torch.FloatTensor`` of shape ``(batch_size, hidden_size)`` Last layer hidden-state of the first token of the sequence (classification token) further processed by a Linear layer and a Tanh activation function. The Linear layer weights are trained from the next sentence prediction (classification) objective during Bert pretraining. This output is usually *not* a good summary of the semantic content of the input, you're often better with averaging or pooling the sequence of hidden-states for the whole input sequence. **hidden_states**: (`optional`, returned when ``config.output_hidden_states=True``) list of ``torch.FloatTensor`` (one for the output of each layer + the output of the embeddings) of shape ``(batch_size, sequence_length, hidden_size)``: Hidden-states of the model at the output of each layer plus the initial embedding outputs. **attentions**: (`optional`, returned when ``config.output_attentions=True``) list of ``torch.FloatTensor`` (one for each layer) of shape ``(batch_size, num_heads, sequence_length, sequence_length)``: Attentions weights after the attention softmax, used to compute the weighted average in the self-attention heads. """
[docs] def __init__(self, config): super().__init__() self.config = config self.embeddings = BertEmbeddings(config) self.encoder = BertEncoder(config) self.pooler = BertPooler(config) # Weights initialization self.apply(self._init_weights)
def _init_weights(self, module): """Initialize the weights.""" if isinstance(module, (nn.Linear, nn.Embedding)): module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) elif isinstance(module, BertLayerNorm): module.bias.data.zero_() module.weight.data.fill_(1.0) if isinstance(module, nn.Linear) and module.bias is not None: module.bias.data.zero_()
[docs] def forward(self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, features=None): if attention_mask is None: attention_mask = torch.ones_like(input_ids) if token_type_ids is None: token_type_ids = torch.zeros_like(input_ids) # We create a 3D attention mask from a 2D tensor mask. # Sizes are [batch_size, 1, 1, to_seq_length] # So we can broadcast to # [batch_size, num_heads, from_seq_length, to_seq_length] extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) # Since attention_mask is 1.0 for positions we want to attend and 0.0 for # masked positions, this operation will create a tensor which is 0.0 for # positions we want to attend and -10000.0 for masked positions. # Since we are adding it to the raw scores before the softmax, this is # effectively the same as removing these entirely. extended_attention_mask = extended_attention_mask.to( dtype=next(self.parameters()).dtype) # fp16 compatibility extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 head_mask = [None] * self.config.num_hidden_layers embedding_output = self.embeddings(input_ids, position_ids=position_ids, token_type_ids=token_type_ids, features=features) encoder_outputs = self.encoder(embedding_output, extended_attention_mask, head_mask=head_mask) sequence_output = encoder_outputs[0] pooled_output = self.pooler(sequence_output) outputs = ( sequence_output, pooled_output, ) + encoder_outputs[1:] # add hidden_states and attentions if they are here # sequence_output, pooled_output, (hidden_states), (attentions) return outputs