| import os |
| import re |
| import shutil |
| import torch |
| import numpy as np |
| from datasets import Dataset, DatasetDict |
| from transformers import GPT2Tokenizer, GPT2LMHeadModel |
|
|
|
|
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
| def resolve_path(*parts): |
| return os.path.abspath(os.path.join(BASE_DIR, *parts)) |
|
|
|
|
|
|
| def contains_special_characters(text): |
| |
| return bool(re.search(r'[^\x00-\x7F]', text)) |
|
|
| def check_texts_for_special_characters(texts): |
| results = [] |
| for i, text in enumerate(texts): |
| if contains_special_characters(text): |
| results.append(f"Text {i}: Contains special characters") |
| return results |
|
|
| def clean_text(text): |
| |
| text = re.sub(r'[^\x00-\x7F]+', '', text) |
| |
| text = re.sub(r'\s+', ' ', text) |
| |
| text = text.strip() |
| if text.endswith("."): |
| text = text[:-1] |
| return text |
|
|
| def clean_texts(texts): |
| return [clean_text(text) for text in texts] |
|
|
|
|
|
|
|
|
| def tokenizing_data_percentile3(tokenizer, data_dict): |
| |
| dataset = Dataset.from_dict(data_dict) |
| |
| sim_percentiles = np.percentile(np.array(dataset["similarity"]), [0, 33, 66, 100]) |
| aes_percentiles = np.percentile(np.array(dataset["aesthetics_score"]), [0, 33, 66, 100]) |
| iqa_percentiles = np.percentile(np.array(dataset["IQAs"]), [0, 33, 66, 100]) |
| |
|
|
| |
| def categorize_percentiles(score, percentiles): |
| if score <= percentiles[1]: |
| return "low" |
| elif score <= percentiles[2]: |
| return "medium" |
| else: |
| return "high" |
| |
| prompt = ( |
| |
| |
| |
| f"<|startoftext|>Similarity: {{sim}}, Aesthetic: {{aes}}, DeQA Quality: {{iqa}}, Query: " |
| ) |
|
|
| def apply_prompt_template(sample, sim_percentiles, aes_percentiles, iqa_percentiles): |
| |
| sim = categorize_percentiles(sample["similarity"], sim_percentiles) |
| aes = categorize_percentiles(sample["aesthetics_score"], aes_percentiles) |
| iqa = categorize_percentiles(sample["IQAs"], iqa_percentiles) |
| return { |
| |
| |
| |
| "prompt": prompt.format(sim=sim, aes=aes, iqa=iqa), |
| "query": sample["text"], |
| } |
|
|
| dataset = dataset.map(apply_prompt_template, |
| fn_kwargs={"sim_percentiles": sim_percentiles, |
| "aes_percentiles": aes_percentiles, |
| "iqa_percentiles": iqa_percentiles |
| }) |
| |
| |
| def tokenize_add_label(sample): |
|
|
| prompt = tokenizer.encode(sample["prompt"], add_special_tokens=False) |
| query = tokenizer.encode(sample["query"], add_special_tokens=False) |
| |
| text = prompt + query |
| |
| tokenized_inputs = tokenizer.pad({"input_ids": text}, padding="max_length", max_length=65, return_tensors="pt") |
| |
| if tokenized_inputs["input_ids"].shape[0] > 65: |
| tokenized_inputs["input_ids"] = tokenized_inputs["input_ids"][:65] |
| if tokenizer.eos_token_id is not None: |
| tokenized_inputs["input_ids"][-1] = tokenizer.eos_token_id |
| if "attention_mask" in tokenized_inputs: |
| tokenized_inputs["attention_mask"] = tokenized_inputs["attention_mask"][:65] |
| |
| |
| tokenized_inputs["labels"] = tokenized_inputs["input_ids"].clone() |
| num_tokens_in_prompt = len(prompt) |
| tokenized_inputs["labels"][:num_tokens_in_prompt] = -100 |
| tokenized_inputs["labels"][tokenized_inputs["labels"] == tokenizer.pad_token_id] = -100 |
| tokenized_inputs["labels"][tokenized_inputs["labels"] == tokenizer.cls_token_id] = -100 |
| |
| |
| tokenized_inputs["similarity"] = sample["similarity"] |
| tokenized_inputs["aesthetics_score"] = sample["aesthetics_score"] |
| tokenized_inputs["IQAs"] = sample["IQAs"] |
|
|
| return tokenized_inputs |
|
|
| tokenized_datasets = dataset.map(tokenize_add_label, remove_columns=["text"]) |
| |
| |
| tokenized_datasets.set_format("torch", columns=["input_ids", "attention_mask", "labels", "aesthetics_score", "similarity", "IQAs", "prompt", "query"]) |
|
|
| |
| return tokenized_datasets |
|
|
|
|
|
|
|
|
| def tokenize_split_save(text_dir, tokenized_data_path, tokenizer): |
| |
| |
|
|
| data_path = os.path.join(text_dir, "data.pt") |
| loaded_data = torch.load(data_path, weights_only=False) |
|
|
|
|
| texts = loaded_data["texts"] |
| faiss_sim = loaded_data['faiss_sim'] |
| aesthetics_score = torch.tensor(loaded_data["aesthetics"]) |
| IQAs = torch.tensor(loaded_data["IQAs"]) |
| |
| print(f"data loaded successfully from {data_path}!") |
| |
| cleaned_texts = [clean_text(text) for text in texts] |
|
|
| print("Adding eos token at the end for each text...") |
| |
| texts_with_eos = [f"{text}<|endoftext|>" for text in cleaned_texts] |
| for ii in range(0, 10): |
| print(texts_with_eos[ii]) |
| |
| |
| lengths = [len(text) for text in texts_with_eos] |
| max_index = lengths.index(max(lengths)) |
| longest_text = texts_with_eos[max_index] |
| longest_text_token = tokenizer.encode(longest_text, return_tensors="pt") |
| |
| print("Longest text:", longest_text) |
| print("Longest text token:", longest_text_token, longest_text_token.shape) |
| |
| data_dict = {'text': texts_with_eos, |
| 'similarity': faiss_sim, |
| 'aesthetics_score': aesthetics_score, |
| 'IQAs': IQAs |
| } |
| |
| tokenized_datasets = tokenizing_data_percentile3(tokenizer, data_dict) |
| |
| tokenized_datasets = tokenized_datasets.train_test_split(test_size=0.2, shuffle=True, seed=42) |
| tokenized_datasets = DatasetDict({ |
| 'train': tokenized_datasets['train'], |
| 'test': tokenized_datasets['test'] |
| }) |
|
|
|
|
| if os.path.exists(tokenized_data_path): |
| shutil.rmtree(tokenized_data_path) |
| tokenized_datasets.save_to_disk(tokenized_data_path) |
| print(f"Tokenized data saved to {tokenized_data_path}!") |
| |
| return tokenized_datasets |
|
|
|
|
|
|
|
|
|
|
| if __name__ == '__main__': |
|
|
|
|
| |
|
|
| text_dir = resolve_path('../', 'processed_data', 'coco') |
| model_name = "gpt2" |
| data_save_path = os.path.join(text_dir, model_name) |
|
|
| tokenizer = GPT2Tokenizer.from_pretrained(model_name) |
| model = GPT2LMHeadModel.from_pretrained(model_name) |
| |
| tokenizer.add_special_tokens({'cls_token': '<|startoftext|>', 'eos_token': '<|endoftext|>', 'pad_token': '<pad>'}) |
| |
| model.config.cls_token_id = tokenizer.cls_token_id |
| model.config.eos_token_id = tokenizer.eos_token_id |
| model.config.pad_token_id = tokenizer.pad_token_id |
| model.resize_token_embeddings(len(tokenizer)) |
| |
| tokenize_split_save(text_dir, data_save_path, tokenizer) |
|
|
| |
| |
|
|
| |
| |
|
|