| import torch
|
| from torch import nn
|
| from torch.nn import CrossEntropyLoss
|
|
|
| from transformers import (
|
| DistilBertForSequenceClassification as SeqClassification,
|
| DistilBertPreTrainedModel,
|
| DistilBertModel,
|
| DistilBertConfig,
|
|
|
| )
|
|
|
| from .modeling_outputs import (
|
| QuestionAnsweringModelOutput,
|
| QuestionAnsweringNaModelOutput,
|
| )
|
|
|
| class DistilBertForSequenceClassification(SeqClassification):
|
| model_type = "distilbert"
|
|
|
| class DistilBertForQuestionAnsweringAVPool(DistilBertPreTrainedModel):
|
| config_class = DistilBertConfig
|
| base_model_prefix = "distilbert"
|
| model_type = "distilbert"
|
|
|
| def __init__(self, config):
|
|
|
| super().__init__(config)
|
| self.num_labels = config.num_labels
|
|
|
| self.distilbert = DistilBertModel(config)
|
| self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels)
|
| self.has_ans = nn.Sequential(
|
|
|
| nn.Dropout(p=0.2),
|
| nn.Linear(config.hidden_size, 2),
|
| )
|
|
|
| self.post_init()
|
| def forward(
|
| self,
|
| input_ids=None,
|
| attention_mask=None,
|
| token_type_ids=None,
|
| position_ids=None,
|
| head_mask=None,
|
| inputs_embeds=None,
|
| start_positions=None,
|
| end_positions=None,
|
| is_impossibles=None,
|
| output_attentions=None,
|
| output_hidden_states=None,
|
| return_dict=None,
|
| ):
|
|
|
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict
|
|
|
|
|
| discriminator_hidden_states = self.distilbert(
|
| input_ids=input_ids,
|
| attention_mask=attention_mask,
|
|
|
|
|
| head_mask=head_mask,
|
| inputs_embeds=inputs_embeds,
|
| output_attentions=output_attentions,
|
| output_hidden_states=output_hidden_states,
|
| )
|
|
|
|
|
| sequence_output = discriminator_hidden_states[0]
|
|
|
|
|
| logits = self.qa_outputs(sequence_output)
|
| start_logits = logits[:, :, 0].squeeze(-1).contiguous()
|
| end_logits = logits[:, :, 1].squeeze(-1).contiguous()
|
|
|
| first_word = sequence_output[:, 0, :]
|
|
|
| has_logits = self.has_ans(first_word)
|
|
|
| total_loss = None
|
| if (
|
| start_positions is not None
|
| and end_positions is not None
|
| and is_impossibles is not None
|
| ):
|
|
|
| if len(start_positions.size()) > 1:
|
| start_positions = start_positions.squeeze(-1)
|
| if len(end_positions.size()) > 1:
|
| end_positions = end_positions.squeeze(-1)
|
| if len(is_impossibles.size()) > 1:
|
| is_impossibles = is_impossibles.squeeze(-1)
|
|
|
|
|
| ignored_index = start_logits.size(1)
|
| start_positions.clamp_(0, ignored_index).long()
|
| end_positions.clamp_(0, ignored_index).long()
|
| is_impossibles.clamp_(0, ignored_index).long()
|
|
|
| loss_fct = CrossEntropyLoss(ignore_index=ignored_index)
|
| start_loss = loss_fct(start_logits, start_positions)
|
| end_loss = loss_fct(end_logits, end_positions)
|
| span_loss = start_loss + end_loss
|
|
|
|
|
| alpha1 = 1.0
|
| alpha2 = 0.5
|
| choice_loss = loss_fct(has_logits, is_impossibles.long())
|
| total_loss = alpha1 * span_loss + alpha2 * choice_loss
|
|
|
| if not return_dict:
|
| output = (
|
| start_logits,
|
| end_logits,
|
| has_logits,
|
| ) + discriminator_hidden_states[2:]
|
| return ((total_loss,) + output) if total_loss is not None else output
|
|
|
| return QuestionAnsweringNaModelOutput(
|
| loss=total_loss,
|
| start_logits=start_logits,
|
| end_logits=end_logits,
|
| has_logits=has_logits,
|
| hidden_states=discriminator_hidden_states.hidden_states,
|
| attentions=discriminator_hidden_states.attentions,
|
| ) |