# coding=utf-8 # Copyright 2020 Microsoft and the Hugging Face Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyTorch DeBERTa model.""" from typing import Optional, Union import torch import torch.utils.checkpoint from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss from ...activations import ACT2FN from ...modeling_layers import GradientCheckpointingLayer from ...modeling_outputs import ( BaseModelOutput, MaskedLMOutput, QuestionAnsweringModelOutput, SequenceClassifierOutput, TokenClassifierOutput, ) from ...modeling_utils import PreTrainedModel from ...utils import auto_docstring, logging from .configuration_deberta import DebertaConfig logger = logging.get_logger(__name__) class DebertaLayerNorm(nn.Module): """LayerNorm module in the TF style (epsilon inside the square root).""" def __init__(self, size, eps=1e-12): super().__init__() self.weight = nn.Parameter(torch.ones(size)) self.bias = nn.Parameter(torch.zeros(size)) self.variance_epsilon = eps def forward(self, hidden_states): input_type = hidden_states.dtype hidden_states = hidden_states.float() mean = hidden_states.mean(-1, keepdim=True) variance = (hidden_states - mean).pow(2).mean(-1, keepdim=True) hidden_states = (hidden_states - mean) / torch.sqrt(variance + self.variance_epsilon) hidden_states = hidden_states.to(input_type) y = self.weight * hidden_states + self.bias return y class DebertaSelfOutput(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) self.LayerNorm = DebertaLayerNorm(config.hidden_size, config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) def forward(self, hidden_states, input_tensor): hidden_states = self.dense(hidden_states) hidden_states = self.dropout(hidden_states) hidden_states = self.LayerNorm(hidden_states + input_tensor) return hidden_states @torch.jit.script def build_relative_position(query_layer, key_layer): """ Build relative position according to the query and key We assume the absolute position of query \\(P_q\\) is range from (0, query_size) and the absolute position of key \\(P_k\\) is range from (0, key_size), The relative positions from query to key is \\(R_{q \\rightarrow k} = P_q - P_k\\) Args: query_size (int): the length of query key_size (int): the length of key Return: `torch.LongTensor`: A tensor with shape [1, query_size, key_size] """ query_size = query_layer.size(-2) key_size = key_layer.size(-2) q_ids = torch.arange(query_size, dtype=torch.long, device=query_layer.device) k_ids = torch.arange(key_size, dtype=torch.long, device=key_layer.device) rel_pos_ids = q_ids[:, None] - k_ids.view(1, -1).repeat(query_size, 1) rel_pos_ids = rel_pos_ids[:query_size, :] rel_pos_ids = rel_pos_ids.unsqueeze(0) return rel_pos_ids @torch.jit.script def c2p_dynamic_expand(c2p_pos, query_layer, relative_pos): return c2p_pos.expand([query_layer.size(0), query_layer.size(1), query_layer.size(2), relative_pos.size(-1)]) @torch.jit.script def p2c_dynamic_expand(c2p_pos, query_layer, key_layer): return c2p_pos.expand([query_layer.size(0), query_layer.size(1), key_layer.size(-2), key_layer.size(-2)]) @torch.jit.script def pos_dynamic_expand(pos_index, p2c_att, key_layer): return pos_index.expand(p2c_att.size()[:2] + (pos_index.size(-2), key_layer.size(-2))) ###### To support a general trace, we have to define these operation as they use python objects (sizes) ################## # which are not supported by torch.jit.trace. # Full credits to @Szustarol @torch.jit.script def scaled_size_sqrt(query_layer: torch.Tensor, scale_factor: int): return torch.sqrt(torch.tensor(query_layer.size(-1), dtype=torch.float) * scale_factor) @torch.jit.script def build_rpos(query_layer: torch.Tensor, key_layer: torch.Tensor, relative_pos): if query_layer.size(-2) != key_layer.size(-2): return build_relative_position(query_layer, key_layer) else: return relative_pos @torch.jit.script def compute_attention_span(query_layer: torch.Tensor, key_layer: torch.Tensor, max_relative_positions: int): return torch.tensor(min(max(query_layer.size(-2), key_layer.size(-2)), max_relative_positions)) @torch.jit.script def uneven_size_corrected(p2c_att, query_layer: torch.Tensor, key_layer: torch.Tensor, relative_pos): if query_layer.size(-2) != key_layer.size(-2): pos_index = relative_pos[:, :, :, 0].unsqueeze(-1) return torch.gather(p2c_att, dim=2, index=pos_dynamic_expand(pos_index, p2c_att, key_layer)) else: return p2c_att ######################################################################################################################## class DisentangledSelfAttention(nn.Module): """ Disentangled self-attention module Parameters: config (`str`): A model config class instance with the configuration to build a new model. The schema is similar to *BertConfig*, for more details, please refer [`DebertaConfig`] """ def __init__(self, config): super().__init__() if config.hidden_size % config.num_attention_heads != 0: raise ValueError( f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention " f"heads ({config.num_attention_heads})" ) self.num_attention_heads = config.num_attention_heads self.attention_head_size = int(config.hidden_size / config.num_attention_heads) self.all_head_size = self.num_attention_heads * self.attention_head_size self.in_proj = nn.Linear(config.hidden_size, self.all_head_size * 3, bias=False) self.q_bias = nn.Parameter(torch.zeros((self.all_head_size), dtype=torch.float)) self.v_bias = nn.Parameter(torch.zeros((self.all_head_size), dtype=torch.float)) self.pos_att_type = config.pos_att_type if config.pos_att_type is not None else [] self.relative_attention = getattr(config, "relative_attention", False) self.talking_head = getattr(config, "talking_head", False) if self.talking_head: self.head_logits_proj = nn.Linear(config.num_attention_heads, config.num_attention_heads, bias=False) self.head_weights_proj = nn.Linear(config.num_attention_heads, config.num_attention_heads, bias=False) else: self.head_logits_proj = None self.head_weights_proj = None if self.relative_attention: self.max_relative_positions = getattr(config, "max_relative_positions", -1) if self.max_relative_positions < 1: self.max_relative_positions = config.max_position_embeddings self.pos_dropout = nn.Dropout(config.hidden_dropout_prob) if "c2p" in self.pos_att_type: self.pos_proj = nn.Linear(config.hidden_size, self.all_head_size, bias=False) if "p2c" in self.pos_att_type: self.pos_q_proj = nn.Linear(config.hidden_size, self.all_head_size) self.dropout = nn.Dropout(config.attention_probs_dropout_prob) def transpose_for_scores(self, x): new_x_shape = x.size()[:-1] + (self.num_attention_heads, -1) x = x.view(new_x_shape) return x.permute(0, 2, 1, 3) def forward( self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, output_attentions: bool = False, query_states: Optional[torch.Tensor] = None, relative_pos: Optional[torch.Tensor] = None, rel_embeddings: Optional[torch.Tensor] = None, ) -> tuple[torch.Tensor, Optional[torch.Tensor]]: """ Call the module Args: hidden_states (`torch.FloatTensor`): Input states to the module usually the output from previous layer, it will be the Q,K and V in *Attention(Q,K,V)* attention_mask (`torch.BoolTensor`): An attention mask matrix of shape [*B*, *N*, *N*] where *B* is the batch size, *N* is the maximum sequence length in which element [i,j] = *1* means the *i* th token in the input can attend to the *j* th token. output_attentions (`bool`, *optional*): Whether return the attention matrix. query_states (`torch.FloatTensor`, *optional*): The *Q* state in *Attention(Q,K,V)*. relative_pos (`torch.LongTensor`): The relative position encoding between the tokens in the sequence. It's of shape [*B*, *N*, *N*] with values ranging in [*-max_relative_positions*, *max_relative_positions*]. rel_embeddings (`torch.FloatTensor`): The embedding of relative distances. It's a tensor of shape [\\(2 \\times \\text{max_relative_positions}\\), *hidden_size*]. """ if query_states is None: qp = self.in_proj(hidden_states) # .split(self.all_head_size, dim=-1) query_layer, key_layer, value_layer = self.transpose_for_scores(qp).chunk(3, dim=-1) else: ws = self.in_proj.weight.chunk(self.num_attention_heads * 3, dim=0) qkvw = [torch.cat([ws[i * 3 + k] for i in range(self.num_attention_heads)], dim=0) for k in range(3)] q = torch.matmul(qkvw[0], query_states.t().to(dtype=qkvw[0].dtype)) k = torch.matmul(qkvw[1], hidden_states.t().to(dtype=qkvw[1].dtype)) v = torch.matmul(qkvw[2], hidden_states.t().to(dtype=qkvw[2].dtype)) query_layer, key_layer, value_layer = [self.transpose_for_scores(x) for x in [q, k, v]] query_layer = query_layer + self.transpose_for_scores(self.q_bias[None, None, :]) value_layer = value_layer + self.transpose_for_scores(self.v_bias[None, None, :]) rel_att: int = 0 # Take the dot product between "query" and "key" to get the raw attention scores. scale_factor = 1 + len(self.pos_att_type) scale = scaled_size_sqrt(query_layer, scale_factor) query_layer = query_layer / scale.to(dtype=query_layer.dtype) attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) if self.relative_attention and rel_embeddings is not None and relative_pos is not None: rel_embeddings = self.pos_dropout(rel_embeddings) rel_att = self.disentangled_att_bias(query_layer, key_layer, relative_pos, rel_embeddings, scale_factor) if rel_att is not None: attention_scores = attention_scores + rel_att # bxhxlxd if self.head_logits_proj is not None: attention_scores = self.head_logits_proj(attention_scores.permute(0, 2, 3, 1)).permute(0, 3, 1, 2) attention_mask = attention_mask.bool() attention_scores = attention_scores.masked_fill(~(attention_mask), torch.finfo(query_layer.dtype).min) # bsz x height x length x dimension attention_probs = nn.functional.softmax(attention_scores, dim=-1) attention_probs = self.dropout(attention_probs) if self.head_weights_proj is not None: attention_probs = self.head_weights_proj(attention_probs.permute(0, 2, 3, 1)).permute(0, 3, 1, 2) context_layer = torch.matmul(attention_probs, value_layer) context_layer = context_layer.permute(0, 2, 1, 3).contiguous() new_context_layer_shape = context_layer.size()[:-2] + (-1,) context_layer = context_layer.view(new_context_layer_shape) if not output_attentions: return (context_layer, None) return (context_layer, attention_probs) def disentangled_att_bias( self, query_layer: torch.Tensor, key_layer: torch.Tensor, relative_pos: torch.Tensor, rel_embeddings: torch.Tensor, scale_factor: int, ): if relative_pos is None: relative_pos = build_relative_position(query_layer, key_layer, query_layer.device) if relative_pos.dim() == 2: relative_pos = relative_pos.unsqueeze(0).unsqueeze(0) elif relative_pos.dim() == 3: relative_pos = relative_pos.unsqueeze(1) # bxhxqxk elif relative_pos.dim() != 4: raise ValueError(f"Relative position ids must be of dim 2 or 3 or 4. {relative_pos.dim()}") att_span = compute_attention_span(query_layer, key_layer, self.max_relative_positions) relative_pos = relative_pos.long() rel_embeddings = rel_embeddings[ self.max_relative_positions - att_span : self.max_relative_positions + att_span, : ].unsqueeze(0) score = 0 # content->position if "c2p" in self.pos_att_type: pos_key_layer = self.pos_proj(rel_embeddings) pos_key_layer = self.transpose_for_scores(pos_key_layer) c2p_att = torch.matmul(query_layer, pos_key_layer.transpose(-1, -2)) c2p_pos = torch.clamp(relative_pos + att_span, 0, att_span * 2 - 1) c2p_att = torch.gather(c2p_att, dim=-1, index=c2p_dynamic_expand(c2p_pos, query_layer, relative_pos)) score += c2p_att # position->content if "p2c" in self.pos_att_type: pos_query_layer = self.pos_q_proj(rel_embeddings) pos_query_layer = self.transpose_for_scores(pos_query_layer) pos_query_layer /= scaled_size_sqrt(pos_query_layer, scale_factor) r_pos = build_rpos( query_layer, key_layer, relative_pos, ) p2c_pos = torch.clamp(-r_pos + att_span, 0, att_span * 2 - 1) p2c_att = torch.matmul(key_layer, pos_query_layer.transpose(-1, -2).to(dtype=key_layer.dtype)) p2c_att = torch.gather( p2c_att, dim=-1, index=p2c_dynamic_expand(p2c_pos, query_layer, key_layer) ).transpose(-1, -2) p2c_att = uneven_size_corrected(p2c_att, query_layer, key_layer, relative_pos) score += p2c_att return score class DebertaEmbeddings(nn.Module): """Construct the embeddings from word, position and token_type embeddings.""" def __init__(self, config): super().__init__() pad_token_id = getattr(config, "pad_token_id", 0) self.embedding_size = getattr(config, "embedding_size", config.hidden_size) self.word_embeddings = nn.Embedding(config.vocab_size, self.embedding_size, padding_idx=pad_token_id) self.position_biased_input = getattr(config, "position_biased_input", True) if not self.position_biased_input: self.position_embeddings = None else: self.position_embeddings = nn.Embedding(config.max_position_embeddings, self.embedding_size) if config.type_vocab_size > 0: self.token_type_embeddings = nn.Embedding(config.type_vocab_size, self.embedding_size) else: self.token_type_embeddings = None if self.embedding_size != config.hidden_size: self.embed_proj = nn.Linear(self.embedding_size, config.hidden_size, bias=False) else: self.embed_proj = None self.LayerNorm = DebertaLayerNorm(config.hidden_size, config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.config = config # position_ids (1, len position emb) is contiguous in memory and exported when serialized self.register_buffer( "position_ids", torch.arange(config.max_position_embeddings).expand((1, -1)), persistent=False ) def forward(self, input_ids=None, token_type_ids=None, position_ids=None, mask=None, inputs_embeds=None): if input_ids is not None: input_shape = input_ids.size() else: input_shape = inputs_embeds.size()[:-1] seq_length = input_shape[1] if position_ids is None: position_ids = self.position_ids[:, :seq_length] if token_type_ids is None: token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=self.position_ids.device) if inputs_embeds is None: inputs_embeds = self.word_embeddings(input_ids) if self.position_embeddings is not None: position_embeddings = self.position_embeddings(position_ids.long()) else: position_embeddings = torch.zeros_like(inputs_embeds) embeddings = inputs_embeds if self.position_biased_input: embeddings = embeddings + position_embeddings if self.token_type_embeddings is not None: token_type_embeddings = self.token_type_embeddings(token_type_ids) embeddings = embeddings + token_type_embeddings if self.embed_proj is not None: embeddings = self.embed_proj(embeddings) embeddings = self.LayerNorm(embeddings) if mask is not None: if mask.dim() != embeddings.dim(): if mask.dim() == 4: mask = mask.squeeze(1).squeeze(1) mask = mask.unsqueeze(2) mask = mask.to(embeddings.dtype) embeddings = embeddings * mask embeddings = self.dropout(embeddings) return embeddings class DebertaAttention(nn.Module): def __init__(self, config): super().__init__() self.self = DisentangledSelfAttention(config) self.output = DebertaSelfOutput(config) self.config = config def forward( self, hidden_states, attention_mask, output_attentions: bool = False, query_states=None, relative_pos=None, rel_embeddings=None, ) -> tuple[torch.Tensor, Optional[torch.Tensor]]: self_output, att_matrix = self.self( hidden_states, attention_mask, output_attentions, query_states=query_states, relative_pos=relative_pos, rel_embeddings=rel_embeddings, ) if query_states is None: query_states = hidden_states attention_output = self.output(self_output, query_states) if output_attentions: return (attention_output, att_matrix) else: return (attention_output, None) # Copied from transformers.models.bert.modeling_bert.BertIntermediate with Bert->Deberta class DebertaIntermediate(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.intermediate_size) if isinstance(config.hidden_act, str): self.intermediate_act_fn = ACT2FN[config.hidden_act] else: self.intermediate_act_fn = config.hidden_act def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: hidden_states = self.dense(hidden_states) hidden_states = self.intermediate_act_fn(hidden_states) return hidden_states class DebertaOutput(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.intermediate_size, config.hidden_size) self.LayerNorm = DebertaLayerNorm(config.hidden_size, config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.config = config def forward(self, hidden_states, input_tensor): hidden_states = self.dense(hidden_states) hidden_states = self.dropout(hidden_states) hidden_states = self.LayerNorm(hidden_states + input_tensor) return hidden_states class DebertaLayer(GradientCheckpointingLayer): def __init__(self, config): super().__init__() self.attention = DebertaAttention(config) self.intermediate = DebertaIntermediate(config) self.output = DebertaOutput(config) def forward( self, hidden_states, attention_mask, query_states=None, relative_pos=None, rel_embeddings=None, output_attentions: bool = False, ) -> tuple[torch.Tensor, Optional[torch.Tensor]]: attention_output, att_matrix = self.attention( hidden_states, attention_mask, output_attentions=output_attentions, query_states=query_states, relative_pos=relative_pos, rel_embeddings=rel_embeddings, ) intermediate_output = self.intermediate(attention_output) layer_output = self.output(intermediate_output, attention_output) if output_attentions: return (layer_output, att_matrix) else: return (layer_output, None) class DebertaEncoder(nn.Module): """Modified BertEncoder with relative position bias support""" def __init__(self, config): super().__init__() self.layer = nn.ModuleList([DebertaLayer(config) for _ in range(config.num_hidden_layers)]) self.relative_attention = getattr(config, "relative_attention", False) if self.relative_attention: self.max_relative_positions = getattr(config, "max_relative_positions", -1) if self.max_relative_positions < 1: self.max_relative_positions = config.max_position_embeddings self.rel_embeddings = nn.Embedding(self.max_relative_positions * 2, config.hidden_size) self.gradient_checkpointing = False def get_rel_embedding(self): rel_embeddings = self.rel_embeddings.weight if self.relative_attention else None return rel_embeddings def get_attention_mask(self, attention_mask): if attention_mask.dim() <= 2: extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) attention_mask = extended_attention_mask * extended_attention_mask.squeeze(-2).unsqueeze(-1) elif attention_mask.dim() == 3: attention_mask = attention_mask.unsqueeze(1) return attention_mask def get_rel_pos(self, hidden_states, query_states=None, relative_pos=None): if self.relative_attention and relative_pos is None: if query_states is not None: relative_pos = build_relative_position(query_states, hidden_states) else: relative_pos = build_relative_position(hidden_states, hidden_states) return relative_pos def forward( self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, output_hidden_states: bool = True, output_attentions: bool = False, query_states=None, relative_pos=None, return_dict: bool = True, ): attention_mask = self.get_attention_mask(attention_mask) relative_pos = self.get_rel_pos(hidden_states, query_states, relative_pos) all_hidden_states: Optional[tuple[torch.Tensor]] = (hidden_states,) if output_hidden_states else None all_attentions = () if output_attentions else None next_kv = hidden_states rel_embeddings = self.get_rel_embedding() for i, layer_module in enumerate(self.layer): hidden_states, att_m = layer_module( next_kv, attention_mask, query_states=query_states, relative_pos=relative_pos, rel_embeddings=rel_embeddings, output_attentions=output_attentions, ) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if query_states is not None: query_states = hidden_states else: next_kv = hidden_states if output_attentions: all_attentions = all_attentions + (att_m,) if not return_dict: return tuple(v for v in [hidden_states, all_hidden_states, all_attentions] if v is not None) return BaseModelOutput( last_hidden_state=hidden_states, hidden_states=all_hidden_states, attentions=all_attentions ) @auto_docstring class DebertaPreTrainedModel(PreTrainedModel): config: DebertaConfig base_model_prefix = "deberta" _keys_to_ignore_on_load_unexpected = ["position_embeddings"] supports_gradient_checkpointing = True def _init_weights(self, module): """Initialize the weights.""" if isinstance(module, nn.Linear): # Slightly different from the TF version which uses truncated_normal for initialization # cf https://github.com/pytorch/pytorch/pull/5617 module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() elif isinstance(module, (nn.LayerNorm, DebertaLayerNorm)): module.weight.data.fill_(1.0) module.bias.data.zero_() elif isinstance(module, DisentangledSelfAttention): module.q_bias.data.zero_() module.v_bias.data.zero_() elif isinstance(module, (LegacyDebertaLMPredictionHead, DebertaLMPredictionHead)): module.bias.data.zero_() @auto_docstring class DebertaModel(DebertaPreTrainedModel): def __init__(self, config): super().__init__(config) self.embeddings = DebertaEmbeddings(config) self.encoder = DebertaEncoder(config) self.z_steps = 0 self.config = config # Initialize weights and apply final processing self.post_init() def get_input_embeddings(self): return self.embeddings.word_embeddings def set_input_embeddings(self, new_embeddings): self.embeddings.word_embeddings = new_embeddings def _prune_heads(self, heads_to_prune): """ Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base class PreTrainedModel """ raise NotImplementedError("The prune function is not implemented in DeBERTa model.") @auto_docstring def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[tuple, BaseModelOutput]: output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict if input_ids is not None and inputs_embeds is not None: raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") elif input_ids is not None: self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask) input_shape = input_ids.size() elif inputs_embeds is not None: input_shape = inputs_embeds.size()[:-1] else: raise ValueError("You have to specify either input_ids or inputs_embeds") device = input_ids.device if input_ids is not None else inputs_embeds.device if attention_mask is None: attention_mask = torch.ones(input_shape, device=device) if token_type_ids is None: token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=device) embedding_output = self.embeddings( input_ids=input_ids, token_type_ids=token_type_ids, position_ids=position_ids, mask=attention_mask, inputs_embeds=inputs_embeds, ) encoder_outputs = self.encoder( embedding_output, attention_mask, output_hidden_states=True, output_attentions=output_attentions, return_dict=return_dict, ) encoded_layers = encoder_outputs[1] if self.z_steps > 1: hidden_states = encoded_layers[-2] layers = [self.encoder.layer[-1] for _ in range(self.z_steps)] query_states = encoded_layers[-1] rel_embeddings = self.encoder.get_rel_embedding() attention_mask = self.encoder.get_attention_mask(attention_mask) rel_pos = self.encoder.get_rel_pos(embedding_output) for layer in layers[1:]: query_states = layer( hidden_states, attention_mask, output_attentions=False, query_states=query_states, relative_pos=rel_pos, rel_embeddings=rel_embeddings, ) encoded_layers.append(query_states) sequence_output = encoded_layers[-1] if not return_dict: return (sequence_output,) + encoder_outputs[(1 if output_hidden_states else 2) :] return BaseModelOutput( last_hidden_state=sequence_output, hidden_states=encoder_outputs.hidden_states if output_hidden_states else None, attentions=encoder_outputs.attentions, ) class LegacyDebertaPredictionHeadTransform(nn.Module): def __init__(self, config): super().__init__() self.embedding_size = getattr(config, "embedding_size", config.hidden_size) self.dense = nn.Linear(config.hidden_size, self.embedding_size) if isinstance(config.hidden_act, str): self.transform_act_fn = ACT2FN[config.hidden_act] else: self.transform_act_fn = config.hidden_act self.LayerNorm = nn.LayerNorm(self.embedding_size, eps=config.layer_norm_eps) def forward(self, hidden_states): hidden_states = self.dense(hidden_states) hidden_states = self.transform_act_fn(hidden_states) hidden_states = self.LayerNorm(hidden_states) return hidden_states class LegacyDebertaLMPredictionHead(nn.Module): def __init__(self, config): super().__init__() self.transform = LegacyDebertaPredictionHeadTransform(config) self.embedding_size = getattr(config, "embedding_size", config.hidden_size) # The output weights are the same as the input embeddings, but there is # an output-only bias for each token. self.decoder = nn.Linear(self.embedding_size, config.vocab_size, bias=False) self.bias = nn.Parameter(torch.zeros(config.vocab_size)) # Need a link between the two variables so that the bias is correctly resized with `resize_token_embeddings` self.decoder.bias = self.bias def _tie_weights(self): self.decoder.bias = self.bias def forward(self, hidden_states): hidden_states = self.transform(hidden_states) hidden_states = self.decoder(hidden_states) return hidden_states # Copied from transformers.models.bert.modeling_bert.BertOnlyMLMHead with Bert->LegacyDeberta class LegacyDebertaOnlyMLMHead(nn.Module): def __init__(self, config): super().__init__() self.predictions = LegacyDebertaLMPredictionHead(config) def forward(self, sequence_output: torch.Tensor) -> torch.Tensor: prediction_scores = self.predictions(sequence_output) return prediction_scores class DebertaLMPredictionHead(nn.Module): """https://github.com/microsoft/DeBERTa/blob/master/DeBERTa/deberta/bert.py#L270""" def __init__(self, config): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) if isinstance(config.hidden_act, str): self.transform_act_fn = ACT2FN[config.hidden_act] else: self.transform_act_fn = config.hidden_act self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps, elementwise_affine=True) self.bias = nn.Parameter(torch.zeros(config.vocab_size)) # note that the input embeddings must be passed as an argument def forward(self, hidden_states, word_embeddings): hidden_states = self.dense(hidden_states) hidden_states = self.transform_act_fn(hidden_states) hidden_states = self.LayerNorm( hidden_states ) # original used MaskedLayerNorm, but passed no mask. This is equivalent. hidden_states = torch.matmul(hidden_states, word_embeddings.weight.t()) + self.bias return hidden_states class DebertaOnlyMLMHead(nn.Module): def __init__(self, config): super().__init__() self.lm_head = DebertaLMPredictionHead(config) # note that the input embeddings must be passed as an argument def forward(self, sequence_output, word_embeddings): prediction_scores = self.lm_head(sequence_output, word_embeddings) return prediction_scores @auto_docstring class DebertaForMaskedLM(DebertaPreTrainedModel): _tied_weights_keys = ["cls.predictions.decoder.weight", "cls.predictions.decoder.bias"] def __init__(self, config): super().__init__(config) self.legacy = config.legacy self.deberta = DebertaModel(config) if self.legacy: self.cls = LegacyDebertaOnlyMLMHead(config) else: self._tied_weights_keys = ["lm_predictions.lm_head.weight", "deberta.embeddings.word_embeddings.weight"] self.lm_predictions = DebertaOnlyMLMHead(config) # Initialize weights and apply final processing self.post_init() def get_output_embeddings(self): if self.legacy: return self.cls.predictions.decoder else: return self.lm_predictions.lm_head.dense def set_output_embeddings(self, new_embeddings): if self.legacy: self.cls.predictions.decoder = new_embeddings self.cls.predictions.bias = new_embeddings.bias else: self.lm_predictions.lm_head.dense = new_embeddings self.lm_predictions.lm_head.bias = new_embeddings.bias @auto_docstring def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[tuple, MaskedLMOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should be in `[-100, 0, ..., config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]` """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.deberta( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) sequence_output = outputs[0] if self.legacy: prediction_scores = self.cls(sequence_output) else: prediction_scores = self.lm_predictions(sequence_output, self.deberta.embeddings.word_embeddings) masked_lm_loss = None if labels is not None: loss_fct = CrossEntropyLoss() # -100 index = padding token masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), labels.view(-1)) if not return_dict: output = (prediction_scores,) + outputs[1:] return ((masked_lm_loss,) + output) if masked_lm_loss is not None else output return MaskedLMOutput( loss=masked_lm_loss, logits=prediction_scores, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) class ContextPooler(nn.Module): def __init__(self, config): super().__init__() self.dense = nn.Linear(config.pooler_hidden_size, config.pooler_hidden_size) self.dropout = nn.Dropout(config.pooler_dropout) self.config = config def forward(self, hidden_states): # We "pool" the model by simply taking the hidden state corresponding # to the first token. context_token = hidden_states[:, 0] context_token = self.dropout(context_token) pooled_output = self.dense(context_token) pooled_output = ACT2FN[self.config.pooler_hidden_act](pooled_output) return pooled_output @property def output_dim(self): return self.config.hidden_size @auto_docstring( custom_intro=""" DeBERTa Model transformer with a sequence classification/regression head on top (a linear layer on top of the pooled output) e.g. for GLUE tasks. """ ) class DebertaForSequenceClassification(DebertaPreTrainedModel): def __init__(self, config): super().__init__(config) num_labels = getattr(config, "num_labels", 2) self.num_labels = num_labels self.deberta = DebertaModel(config) self.pooler = ContextPooler(config) output_dim = self.pooler.output_dim self.classifier = nn.Linear(output_dim, num_labels) drop_out = getattr(config, "cls_dropout", None) drop_out = self.config.hidden_dropout_prob if drop_out is None else drop_out self.dropout = nn.Dropout(drop_out) # Initialize weights and apply final processing self.post_init() def get_input_embeddings(self): return self.deberta.get_input_embeddings() def set_input_embeddings(self, new_embeddings): self.deberta.set_input_embeddings(new_embeddings) @auto_docstring def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[tuple, SequenceClassifierOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If `config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.deberta( input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask, position_ids=position_ids, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) encoder_layer = outputs[0] pooled_output = self.pooler(encoder_layer) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) loss = None if labels is not None: if self.config.problem_type is None: if self.num_labels == 1: # regression task loss_fn = nn.MSELoss() logits = logits.view(-1).to(labels.dtype) loss = loss_fn(logits, labels.view(-1)) elif labels.dim() == 1 or labels.size(-1) == 1: label_index = (labels >= 0).nonzero() labels = labels.long() if label_index.size(0) > 0: labeled_logits = torch.gather( logits, 0, label_index.expand(label_index.size(0), logits.size(1)) ) labels = torch.gather(labels, 0, label_index.view(-1)) loss_fct = CrossEntropyLoss() loss = loss_fct(labeled_logits.view(-1, self.num_labels).float(), labels.view(-1)) else: loss = torch.tensor(0).to(logits) else: log_softmax = nn.LogSoftmax(-1) loss = -((log_softmax(logits) * labels).sum(-1)).mean() elif self.config.problem_type == "regression": loss_fct = MSELoss() if self.num_labels == 1: loss = loss_fct(logits.squeeze(), labels.squeeze()) else: loss = loss_fct(logits, labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[1:] return ((loss,) + output) if loss is not None else output return SequenceClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions ) @auto_docstring class DebertaForTokenClassification(DebertaPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.deberta = DebertaModel(config) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) # Initialize weights and apply final processing self.post_init() @auto_docstring def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[tuple, TokenClassifierOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`. """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.deberta( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) loss = None if labels is not None: loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: output = (logits,) + outputs[1:] return ((loss,) + output) if loss is not None else output return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions ) @auto_docstring class DebertaForQuestionAnswering(DebertaPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.deberta = DebertaModel(config) self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels) # Initialize weights and apply final processing self.post_init() @auto_docstring def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, start_positions: Optional[torch.Tensor] = None, end_positions: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[tuple, QuestionAnsweringModelOutput]: return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.deberta( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) sequence_output = outputs[0] logits = self.qa_outputs(sequence_output) start_logits, end_logits = logits.split(1, dim=-1) start_logits = start_logits.squeeze(-1).contiguous() end_logits = end_logits.squeeze(-1).contiguous() total_loss = None if start_positions is not None and end_positions is not None: # If we are on multi-GPU, split add a dimension if len(start_positions.size()) > 1: start_positions = start_positions.squeeze(-1) if len(end_positions.size()) > 1: end_positions = end_positions.squeeze(-1) # sometimes the start/end positions are outside our model inputs, we ignore these terms ignored_index = start_logits.size(1) start_positions = start_positions.clamp(0, ignored_index) end_positions = end_positions.clamp(0, ignored_index) loss_fct = CrossEntropyLoss(ignore_index=ignored_index) start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) total_loss = (start_loss + end_loss) / 2 if not return_dict: output = (start_logits, end_logits) + outputs[1:] return ((total_loss,) + output) if total_loss is not None else output return QuestionAnsweringModelOutput( loss=total_loss, start_logits=start_logits, end_logits=end_logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) __all__ = [ "DebertaForMaskedLM", "DebertaForQuestionAnswering", "DebertaForSequenceClassification", "DebertaForTokenClassification", "DebertaModel", "DebertaPreTrainedModel", ]