| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| import unittest |
|
|
| from transformers import is_torch_available |
| from transformers.testing_utils import require_torch, torch_device |
|
|
| from .test_modeling_common import ids_tensor |
|
|
|
|
| if is_torch_available(): |
| import torch |
| from torch import nn |
|
|
| from transformers.generation_logits_process import ( |
| EncoderNoRepeatNGramLogitsProcessor, |
| ForcedBOSTokenLogitsProcessor, |
| ForcedEOSTokenLogitsProcessor, |
| HammingDiversityLogitsProcessor, |
| InfNanRemoveLogitsProcessor, |
| LogitsProcessorList, |
| MinLengthLogitsProcessor, |
| NoBadWordsLogitsProcessor, |
| NoRepeatNGramLogitsProcessor, |
| PrefixConstrainedLogitsProcessor, |
| RepetitionPenaltyLogitsProcessor, |
| TemperatureLogitsWarper, |
| TopKLogitsWarper, |
| TopPLogitsWarper, |
| ) |
|
|
|
|
| @require_torch |
| class LogitsProcessorTest(unittest.TestCase): |
| def _get_uniform_logits(self, batch_size: int, length: int): |
| scores = torch.ones((batch_size, length), device=torch_device, dtype=torch.float) / length |
| return scores |
|
|
| def test_min_lenght_dist_processor(self): |
| vocab_size = 20 |
| batch_size = 4 |
| eos_token_id = 0 |
|
|
| min_dist_processor = MinLengthLogitsProcessor(min_length=10, eos_token_id=eos_token_id) |
|
|
| |
| input_ids = ids_tensor((batch_size, 5), vocab_size=20) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
| scores_before_min_length = min_dist_processor(input_ids, scores) |
| self.assertListEqual(scores_before_min_length[:, eos_token_id].tolist(), 4 * [-float("inf")]) |
|
|
| |
| input_ids = ids_tensor((batch_size, 15), vocab_size=20) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
| scores_before_min_length = min_dist_processor(input_ids, scores) |
| self.assertFalse(torch.isinf(scores_before_min_length).any()) |
|
|
| def test_temperature_dist_warper(self): |
| input_ids = None |
| length = 20 |
|
|
| scores = self._get_uniform_logits(batch_size=2, length=length) |
|
|
| |
| scores[1, 5] = (1 / length) + 0.1 |
| scores[1, 10] = (1 / length) - 0.4 |
|
|
| |
| probs = nn.functional.softmax(scores, dim=-1) |
|
|
| temp_dist_warper_sharper = TemperatureLogitsWarper(temperature=0.5) |
| temp_dist_warper_smoother = TemperatureLogitsWarper(temperature=1.3) |
|
|
| warped_prob_sharp = nn.functional.softmax(temp_dist_warper_sharper(input_ids, scores.clone()), dim=-1) |
| warped_prob_smooth = nn.functional.softmax(temp_dist_warper_smoother(input_ids, scores.clone()), dim=-1) |
|
|
| |
| self.assertTrue(torch.allclose(probs[0, :], warped_prob_sharp[0, :], atol=1e-3)) |
| self.assertTrue(torch.allclose(probs[0, :], warped_prob_smooth[0, :], atol=1e-3)) |
|
|
| |
| self.assertLess(probs[1, :].max(), warped_prob_sharp[1, :].max()) |
| self.assertGreater(probs[1, :].min(), warped_prob_sharp[1, :].min()) |
|
|
| |
| self.assertGreater(probs[1, :].max(), warped_prob_smooth[1, :].max()) |
| self.assertLess(probs[1, :].min(), warped_prob_smooth[1, :].min()) |
|
|
| def test_repetition_penalty_dist_process(self): |
| input_ids = torch.tensor([[0, 1], [5, 0]], device=torch_device, dtype=torch.long) |
| vocab_size = 10 |
|
|
| scores = self._get_uniform_logits(batch_size=2, length=vocab_size) |
|
|
| |
| scores[0, 0] = -(1 / vocab_size) |
| scores[1, 5] = 4 / vocab_size |
|
|
| rep_penalty_proc = RepetitionPenaltyLogitsProcessor(penalty=2.0) |
|
|
| scores = rep_penalty_proc(input_ids, scores.clone()) |
|
|
| |
| self.assertAlmostEqual(scores[0, 0].item(), -(1 / vocab_size) * 2) |
| self.assertAlmostEqual(scores[0, 1].item(), (1 / vocab_size) / 2) |
|
|
| self.assertAlmostEqual(scores[1, 0].item(), (1 / vocab_size) / 2) |
| self.assertAlmostEqual(scores[1, 5].item(), (4 / vocab_size) / 2) |
|
|
| def test_top_k_dist_warper(self): |
| input_ids = None |
| vocab_size = 10 |
| batch_size = 2 |
|
|
| |
| ramp_logits = ( |
| torch.arange(vocab_size, device=torch_device, dtype=torch.float).unsqueeze(0).repeat(batch_size, 1) |
| ) |
| ramp_logits[1:, : vocab_size // 2] = ramp_logits[1:, : vocab_size // 2] + vocab_size |
|
|
| top_k_warp = TopKLogitsWarper(3) |
|
|
| scores = top_k_warp(input_ids, ramp_logits) |
|
|
| |
| self.assertListEqual(torch.isinf(scores[0]).tolist(), 7 * [True] + 3 * [False]) |
| self.assertListEqual(torch.isinf(scores[1]).tolist(), 2 * [True] + 3 * [False] + 5 * [True]) |
|
|
| |
| length = 5 |
|
|
| logits = self._get_uniform_logits(batch_size=batch_size, length=length) |
| top_k_warp_safety_check = TopKLogitsWarper(top_k=1, filter_value=0.0, min_tokens_to_keep=3) |
|
|
| scores = top_k_warp_safety_check(input_ids, logits) |
| |
| self.assertListEqual((scores == 0.0).to(torch.long).sum(dim=-1).tolist(), [0, 0]) |
|
|
| ramp_logits = torch.arange(length, device=torch_device, dtype=torch.float).unsqueeze(0).repeat(batch_size, 1) |
| scores = top_k_warp_safety_check(input_ids, ramp_logits) |
|
|
| |
| self.assertListEqual((scores == 0.0).to(torch.long).sum(dim=-1).tolist(), [2, 2]) |
|
|
| def test_top_p_dist_warper(self): |
| input_ids = None |
| vocab_size = 10 |
| batch_size = 2 |
|
|
| |
| dist = torch.log( |
| torch.tensor([[0.3, 0.1, 0.1, 0.5], [0.15, 0.3, 0.3, 0.25]], device=torch_device, dtype=torch.float) |
| ) |
|
|
| top_p_warp = TopPLogitsWarper(0.7) |
| filtered_dist = torch.exp(top_p_warp(input_ids, dist)) |
|
|
| |
| |
| EXPECTED_FILTERED_DIST = torch.tensor( |
| [[0.3, 0.0, 0.0, 0.5], [0.0, 0.3, 0.3, 0.25]], device=torch_device, dtype=torch.float |
| ) |
| self.assertTrue(torch.allclose(filtered_dist, EXPECTED_FILTERED_DIST, atol=1e-3)) |
|
|
| |
| ramp_logits = torch.arange(vocab_size, device=torch_device, dtype=torch.float).unsqueeze(0).repeat( |
| batch_size, 1 |
| ) - (vocab_size // 2) |
|
|
| |
| ramp_logits[1] = ramp_logits[1] * 100.0 |
|
|
| |
| top_p_warp = TopPLogitsWarper(0.9, min_tokens_to_keep=2, filter_value=0.0) |
| filtered_dist = top_p_warp(input_ids, ramp_logits) |
|
|
| |
| self.assertListEqual((filtered_dist != 0.0).to(torch.long).sum(dim=-1).tolist(), [3, 2]) |
|
|
| def test_no_repeat_ngram_dist_processor(self): |
| vocab_size = 3 |
| batch_size = 2 |
|
|
| input_ids = torch.tensor([[1, 1, 2, 1], [0, 1, 0, 1]], device=torch_device, dtype=torch.long) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
|
|
| no_repeat_proc_2_gram = NoRepeatNGramLogitsProcessor(2) |
| no_repeat_proc_3_gram = NoRepeatNGramLogitsProcessor(3) |
|
|
| filtered_scores_2_gram = no_repeat_proc_2_gram(input_ids, scores.clone()) |
| filtered_scores_3_gram = no_repeat_proc_3_gram(input_ids, scores.clone()) |
|
|
| |
| self.assertListEqual(torch.isinf(filtered_scores_2_gram).tolist(), [[False, True, True], [True, False, False]]) |
|
|
| |
| self.assertListEqual( |
| torch.isinf(filtered_scores_3_gram).tolist(), [[False, False, False], [True, False, False]] |
| ) |
|
|
| def test_encoder_no_repeat_ngram_dist_processor(self): |
| vocab_size = 3 |
| num_beams = 2 |
| batch_size = 1 |
|
|
| encoder_input_ids = torch.tensor([1, 2, 1, 1], device=torch_device, dtype=torch.long) |
|
|
| input_ids = torch.tensor([[1, 2, 1], [8, 0, 2]], device=torch_device, dtype=torch.long) |
| scores = self._get_uniform_logits(batch_size * num_beams, vocab_size) |
|
|
| no_repeat_proc_2_gram = EncoderNoRepeatNGramLogitsProcessor(2, encoder_input_ids=encoder_input_ids) |
| no_repeat_proc_3_gram = EncoderNoRepeatNGramLogitsProcessor(3, encoder_input_ids=encoder_input_ids) |
|
|
| filtered_scores_2_gram = no_repeat_proc_2_gram(input_ids, scores.clone()) |
| filtered_scores_3_gram = no_repeat_proc_3_gram(input_ids, scores.clone()) |
|
|
| |
| self.assertListEqual(torch.isinf(filtered_scores_2_gram).tolist(), [[False, True, True], [False, True, False]]) |
|
|
| |
| self.assertListEqual( |
| torch.isinf(filtered_scores_3_gram).tolist(), [[False, True, False], [False, False, False]] |
| ) |
|
|
| |
| vocab_size = 3 |
| num_beams = 2 |
| batch_size = 2 |
| encoder_input_ids = torch.tensor([[1, 2, 1, 1], [0, 0, 2, 1]], device=torch_device, dtype=torch.long) |
|
|
| input_ids = torch.tensor([[1, 2, 1], [1, 0, 2], [0, 0, 0], [0, 2, 2]], device=torch_device, dtype=torch.long) |
| scores = self._get_uniform_logits(batch_size * num_beams, vocab_size) |
|
|
| no_repeat_proc_2_gram = EncoderNoRepeatNGramLogitsProcessor(2, encoder_input_ids=encoder_input_ids) |
| no_repeat_proc_3_gram = EncoderNoRepeatNGramLogitsProcessor(3, encoder_input_ids=encoder_input_ids) |
|
|
| filtered_scores_2_gram = no_repeat_proc_2_gram(input_ids, scores.clone()) |
| filtered_scores_3_gram = no_repeat_proc_3_gram(input_ids, scores.clone()) |
|
|
| |
| |
| |
| |
| |
| |
| |
| self.assertListEqual( |
| torch.isinf(filtered_scores_2_gram).tolist(), |
| [[False, True, True], [False, True, False], [True, False, True], [False, True, False]], |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| self.assertListEqual( |
| torch.isinf(filtered_scores_3_gram).tolist(), |
| [[False, True, False], [False, False, False], [False, False, True], [False, False, False]], |
| ) |
|
|
| def test_no_bad_words_dist_processor(self): |
| vocab_size = 5 |
| batch_size = 2 |
| eos_token_id = 4 |
|
|
| input_ids = torch.tensor([[0, 1, 3, 1], [0, 1, 0, 1]], device=torch_device, dtype=torch.long) |
| bad_word_tokens = [[1], [4], [1, 0], [0, 1, 2], [1, 3, 1, 3]] |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
|
|
| no_bad_words_dist_proc = NoBadWordsLogitsProcessor(bad_words_ids=bad_word_tokens, eos_token_id=eos_token_id) |
|
|
| filtered_scores = no_bad_words_dist_proc(input_ids, scores.clone()) |
|
|
| |
| |
| |
| self.assertListEqual( |
| torch.isinf(filtered_scores).tolist(), [[True, True, False, True, False], [True, True, True, False, False]] |
| ) |
|
|
| |
| no_bad_words_dist_proc = NoBadWordsLogitsProcessor(bad_words_ids=[[4]], eos_token_id=eos_token_id) |
| filtered_scores = no_bad_words_dist_proc(input_ids, scores.clone()) |
| self.assertTrue(torch.allclose(scores, filtered_scores, atol=1e-3)) |
|
|
| def test_processor_list(self): |
| batch_size = 4 |
| sequence_length = 10 |
| vocab_size = 15 |
| eos_token_id = 0 |
|
|
| |
| input_ids = ids_tensor((batch_size, sequence_length), vocab_size) |
| input_ids_comp = input_ids.clone() |
|
|
| scores = self._get_uniform_logits(batch_size, vocab_size) |
| scores_comp = scores.clone() |
|
|
| |
| min_dist_proc = MinLengthLogitsProcessor(min_length=10, eos_token_id=eos_token_id) |
| temp_dist_warp = TemperatureLogitsWarper(temperature=0.5) |
| rep_penalty_proc = RepetitionPenaltyLogitsProcessor(penalty=2.0) |
| top_k_warp = TopKLogitsWarper(3) |
| top_p_warp = TopPLogitsWarper(0.8) |
| no_repeat_proc = NoRepeatNGramLogitsProcessor(2) |
| no_bad_words_dist_proc = NoBadWordsLogitsProcessor(bad_words_ids=[[1]], eos_token_id=eos_token_id) |
|
|
| |
| scores = min_dist_proc(input_ids, scores) |
| scores = temp_dist_warp(input_ids, scores) |
| scores = rep_penalty_proc(input_ids, scores) |
| scores = top_k_warp(input_ids, scores) |
| scores = top_p_warp(input_ids, scores) |
| scores = no_repeat_proc(input_ids, scores) |
| scores = no_bad_words_dist_proc(input_ids, scores) |
|
|
| |
| processor = LogitsProcessorList( |
| [ |
| min_dist_proc, |
| temp_dist_warp, |
| rep_penalty_proc, |
| top_k_warp, |
| top_p_warp, |
| no_repeat_proc, |
| no_bad_words_dist_proc, |
| ] |
| ) |
| scores_comp = processor(input_ids, scores_comp) |
|
|
| |
| self.assertTrue(torch.allclose(scores, scores_comp, atol=1e-3)) |
|
|
| |
| self.assertListEqual(input_ids.tolist(), input_ids_comp.tolist()) |
|
|
| def test_prefix_constrained_logits_processor(self): |
| vocab_size = 5 |
| batch_size = 2 |
|
|
| input_ids = torch.tensor([[0, 1, 3, 1], [0, 1, 0, 1]], device=torch_device, dtype=torch.long) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
|
|
| def prefix_allowed_tokens_fn(batch_id, inputs_ids): |
| return [[0, 1], [2, 3]][batch_id] |
|
|
| prefix_constrained_logits_proc = PrefixConstrainedLogitsProcessor(prefix_allowed_tokens_fn, 1) |
|
|
| filtered_scores = prefix_constrained_logits_proc(input_ids, scores.clone()) |
|
|
| |
| |
| self.assertListEqual( |
| torch.isinf(filtered_scores).tolist(), [[False, False, True, True, True], [True, True, False, False, True]] |
| ) |
|
|
| def test_hamming_diversity(self): |
| vocab_size = 4 |
| num_beams = 2 |
| num_beam_groups = 2 |
|
|
| scores = self._get_uniform_logits(num_beams, vocab_size) |
| |
| |
| current_tokens = torch.tensor([0, 3, 1, 2], device=torch_device, dtype=torch.long) |
|
|
| diversity_logits_processor = HammingDiversityLogitsProcessor( |
| diversity_penalty=1.0, num_beams=num_beams, num_beam_groups=num_beam_groups |
| ) |
|
|
| processed_scores = diversity_logits_processor(None, scores, current_tokens, 1) |
|
|
| self.assertTrue( |
| torch.allclose( |
| processed_scores[0], torch.tensor([-0.7500, 0.2500, 0.2500, 0.2500], device=torch_device), atol=1e-3 |
| ) |
| ) |
| self.assertTrue( |
| torch.allclose( |
| processed_scores[1], torch.tensor([0.2500, -0.7500, 0.2500, 0.2500], device=torch_device), atol=1e-3 |
| ) |
| ) |
|
|
| def test_forced_bos_token_logits_processor(self): |
| vocab_size = 20 |
| batch_size = 4 |
| bos_token_id = 0 |
|
|
| logits_processor = ForcedBOSTokenLogitsProcessor(bos_token_id=bos_token_id) |
|
|
| |
| input_ids = ids_tensor((batch_size, 1), vocab_size=20) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
| scores = logits_processor(input_ids, scores) |
| self.assertTrue(torch.isneginf(scores[:, bos_token_id + 1 :]).all()) |
| self.assertListEqual(scores[:, bos_token_id].tolist(), 4 * [0]) |
|
|
| |
| input_ids = ids_tensor((batch_size, 4), vocab_size=20) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
| scores = logits_processor(input_ids, scores) |
| self.assertFalse(torch.isinf(scores).any()) |
|
|
| def test_forced_eos_token_logits_processor(self): |
| vocab_size = 20 |
| batch_size = 4 |
| eos_token_id = 0 |
| max_length = 5 |
|
|
| logits_processor = ForcedEOSTokenLogitsProcessor(max_length=max_length, eos_token_id=eos_token_id) |
|
|
| |
| input_ids = ids_tensor((batch_size, 4), vocab_size=20) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
| scores = logits_processor(input_ids, scores) |
| self.assertTrue(torch.isneginf(scores[:, eos_token_id + 1 :]).all()) |
| self.assertListEqual(scores[:, eos_token_id].tolist(), 4 * [0]) |
|
|
| |
| input_ids = ids_tensor((batch_size, 3), vocab_size=20) |
| scores = self._get_uniform_logits(batch_size, vocab_size) |
| scores = logits_processor(input_ids, scores) |
| self.assertFalse(torch.isinf(scores).any()) |
|
|
| def test_remove_nan_inf_logits_processor(self): |
| scores = torch.tensor( |
| [[0.0, 0.7, 0.8, float("nan")], [0.1, float("inf"), 0.3, float("-inf")]], device=torch_device |
| ) |
| input_ids = ids_tensor((2, 4), vocab_size=20) |
|
|
| logits_processor = InfNanRemoveLogitsProcessor() |
|
|
| scores = logits_processor(input_ids, scores) |
|
|
| self.assertTrue( |
| torch.allclose( |
| scores, |
| torch.tensor( |
| [[0.0, 0.7, 0.8, 0.0], [0.1, torch.finfo(scores.dtype).max, 0.3, float("-inf")]], |
| device=torch_device, |
| ), |
| atol=1e-6, |
| ) |
| ) |
|
|