| import torch |
| import transformers |
| from torch.utils.data import Dataset, DataLoader |
| from transformers import RobertaModel, RobertaTokenizer, BertModel, BertTokenizer |
| import pandas as pd |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| MAX_LEN = 128 |
| BATCH_SIZE = 20 |
| text_col_name = 'sentence' |
|
|
| def scoring_data_prep(dataset): |
| out = [] |
| target = [] |
| mask = [] |
| |
| for i in range(len(dataset)): |
| rec = dataset[i] |
| out.append(rec['ids'].reshape(-1,MAX_LEN)) |
| mask.append(rec['mask'].reshape(-1,MAX_LEN)) |
|
|
| out_stack = torch.cat(out, dim = 0) |
| mask_stack = torch.cat(mask, dim =0 ) |
| out_stack = out_stack.to(device, dtype = torch.long) |
| mask_stack = mask_stack.to(device, dtype = torch.long) |
|
|
| return out_stack, mask_stack |
|
|
| class Triage(Dataset): |
| """ |
| This is a subclass of torch packages Dataset class. It processes input to create ids, masks and targets required for model training. |
| """ |
|
|
| def __init__(self, dataframe, tokenizer, max_len, text_col_name): |
| self.len = len(dataframe) |
| self.data = dataframe |
| self.tokenizer = tokenizer |
| self.max_len = max_len |
| self.text_col_name = text_col_name |
| |
|
|
| def __getitem__(self, index): |
| title = str(self.data[self.text_col_name][index]) |
| title = " ".join(title.split()) |
| inputs = self.tokenizer.encode_plus( |
| title, |
| None, |
| add_special_tokens=True, |
| max_length=self.max_len, |
| pad_to_max_length=True, |
| return_token_type_ids=True, |
| truncation=True, |
| ) |
| ids = inputs["input_ids"] |
| mask = inputs["attention_mask"] |
|
|
| return { |
| "ids": torch.tensor(ids, dtype=torch.long), |
| "mask": torch.tensor(mask, dtype=torch.long), |
| |
| } |
|
|
| def __len__(self): |
| return self.len |
|
|
| class BERTClass(torch.nn.Module): |
| def __init__(self, num_class, task): |
| super(BERTClass, self).__init__() |
| self.num_class = num_class |
| if task =="sustanability": |
| self.l1 = RobertaModel.from_pretrained("roberta-base") |
| else: |
| self.l1 = BertModel.from_pretrained("ProsusAI/finbert") |
| self.pre_classifier = torch.nn.Linear(768, 768) |
| self.dropout = torch.nn.Dropout(0.3) |
| self.classifier = torch.nn.Linear(768, self.num_class) |
| self.history = dict() |
|
|
| def forward(self, input_ids, attention_mask): |
| output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask) |
| hidden_state = output_1[0] |
| pooler = hidden_state[:, 0] |
| pooler = self.pre_classifier(pooler) |
| pooler = torch.nn.ReLU()(pooler) |
| pooler = self.dropout(pooler) |
| output = self.classifier(pooler) |
| return output |
|
|
| def do_predict(model, tokenizer, test_df): |
| test_set = Triage(test_df, tokenizer, MAX_LEN, text_col_name) |
| test_params = {'batch_size' : BATCH_SIZE, 'shuffle': False, 'num_workers':0} |
| test_loader = DataLoader(test_set, **test_params) |
| out_stack, mask_stack = scoring_data_prep(dataset = test_set) |
| n = 0 |
| combined_output = [] |
| model.eval() |
| with torch.no_grad(): |
| while n < test_df.shape[0]: |
| output = model(out_stack[n:n+BATCH_SIZE,:],mask_stack[n:n+BATCH_SIZE,:]) |
| n = n + BATCH_SIZE |
| combined_output.append(output) |
| combined_output = torch.cat(combined_output, dim = 0) |
| preds = torch.argsort(combined_output, axis = 1, descending = True) |
| preds = preds.to('cpu') |
| actual_predictions = [i[0] for i in preds.tolist()] |
| combined_output = combined_output.to('cpu') |
| prob_predictions= [i[1] for i in combined_output.tolist()] |
| return (actual_predictions, prob_predictions) |
| |