| from torch.utils.data import Dataset, DataLoader
|
| import torchvision.transforms as transforms
|
| import random
|
| import numpy as np
|
| from PIL import Image
|
| import json
|
| import torch
|
| import os
|
| import matplotlib
|
|
|
| def unpickle(file):
|
| fo = open(file, 'rb').read()
|
| size = 64 * 64 * 3 + 1
|
| for i in range(50000):
|
| arr = np.fromstring(fo[i * size:(i + 1) * size], dtype=np.uint8)
|
| lab = np.identity(10)[arr[0]]
|
| img = arr[1:].reshape((3, 64, 64)).transpose((1, 2, 0))
|
| return img, lab
|
|
|
| class animal_dataset(Dataset):
|
| def __init__(self, root, transform, mode, pred=[], path=[], probability=[], num_class=10):
|
|
|
| self.root = root
|
| self.transform = transform
|
| self.mode = mode
|
|
|
| self.train_dir = root + '/training/'
|
| self.test_dir = root + '/testing/'
|
| train_imgs = os.listdir(self.train_dir)
|
| test_imgs = os.listdir(self.test_dir)
|
| self.test_data = []
|
| self.test_labels = []
|
| noise_file1 = './training_batch.json'
|
| noise_file2 = './testing_batch.json'
|
| if mode == 'test':
|
| if os.path.exists(noise_file2):
|
| dict = json.load(open(noise_file2, "r"))
|
| self.test_labels = dict['data']
|
| self.test_data = dict['label']
|
| else:
|
| for img in test_imgs:
|
| self.test_data.append(self.test_dir+img)
|
| self.test_labels.append(int(img[0]))
|
| dicts = {}
|
| dicts['data'] = self.test_data
|
| dicts['label'] = self.test_labels
|
|
|
| else:
|
| if os.path.exists(noise_file1):
|
| dict = json.load(open(noise_file1, "r"))
|
| train_data = dict['data']
|
| train_labels = dict['label']
|
| else:
|
| train_data = []
|
| train_labels = {}
|
| for img in train_imgs:
|
| img_path = self.train_dir+img
|
| train_data.append(img_path)
|
| train_labels[img_path] = (int(img[0]))
|
| dicts = {}
|
| dicts['data'] = train_data
|
| dicts['label'] = train_labels
|
|
|
| if self.mode == "all":
|
| self.train_data = train_data
|
| self.train_labels = train_labels
|
| elif self.mode == "labeled":
|
| pred_idx = pred.nonzero()[0]
|
| train_img = path
|
| self.train_data = [train_img[i] for i in pred_idx]
|
| self.probability = probability[pred_idx]
|
|
|
| print("%s data has a size of %d" % (self.mode, len(self.train_data)))
|
| self.train_labels = train_labels
|
| elif self.mode == "unlabeled":
|
| pred_idx = (1 - pred).nonzero()[0]
|
| train_img = path
|
| self.train_data = [train_img[i] for i in pred_idx]
|
| self.probability = probability[pred_idx]
|
|
|
| print("%s data has a size of %d" % (self.mode, len(self.train_data)))
|
| self.train_labels = train_labels
|
|
|
| def __getitem__(self, index):
|
| if self.mode == 'labeled':
|
| img_path = self.train_data[index]
|
| target = self.train_labels[img_path]
|
| prob = self.probability[index]
|
| image = Image.open(img_path).convert('RGB')
|
| img1 = self.transform(image)
|
| img2 = self.transform(image)
|
| return img1, img2, target, prob
|
| elif self.mode == 'unlabeled':
|
| img_path = self.train_data[index]
|
| image = Image.open(img_path).convert('RGB')
|
| img1 = self.transform(image)
|
| img2 = self.transform(image)
|
| return img1, img2
|
| elif self.mode == 'all':
|
| img_path = self.train_data[index]
|
| target = self.train_labels[img_path]
|
| image = Image.open(img_path).convert('RGB')
|
| img = self.transform(image)
|
| return img, target,img_path
|
| elif self.mode == 'test':
|
| img_path = self.test_data[index]
|
| target = self.test_labels[index]
|
| image = Image.open(img_path).convert('RGB')
|
| img = self.transform(image)
|
| return img, target
|
|
|
| def __len__(self):
|
| if self.mode == 'test':
|
| return len(self.test_data)
|
| else:
|
| return len(self.train_data)
|
|
|
|
|
| class animal_dataloader():
|
| def __init__(self, root='E:/2_Dataset_All/Animal-10N', batch_size=32, num_workers=0):
|
| self.batch_size = batch_size
|
| self.num_workers = num_workers
|
| self.root = root
|
|
|
| self.transform_train = transforms.Compose([
|
| transforms.Resize(64),
|
| transforms.RandomCrop(64),
|
| transforms.RandomHorizontalFlip(),
|
| transforms.ToTensor(),
|
| transforms.Normalize((0.6959, 0.6537, 0.6371), (0.3113, 0.3192, 0.3214)),
|
| ])
|
| self.transform_test = transforms.Compose([
|
|
|
|
|
| transforms.ToTensor(),
|
| transforms.Normalize((0.6959, 0.6537, 0.6371), (0.3113, 0.3192, 0.3214)),
|
| ])
|
|
|
| def run(self, mode, pred=[], prob=[], paths=[]):
|
| if mode == 'warmup':
|
| warmup_dataset = animal_dataset(self.root, transform=self.transform_train, mode='all')
|
| warmup_loader = DataLoader(
|
| dataset=warmup_dataset,
|
| batch_size=self.batch_size * 2,
|
| shuffle=True,
|
| num_workers=self.num_workers,
|
| pin_memory=True)
|
| return warmup_loader
|
| elif mode == 'train':
|
| labeled_dataset = animal_dataset(self.root, transform=self.transform_train, mode='labeled', pred=pred, path=paths,
|
| probability=prob)
|
| labeled_loader = DataLoader(
|
| dataset=labeled_dataset,
|
| batch_size=self.batch_size,
|
| shuffle=True,
|
| num_workers=self.num_workers,
|
| pin_memory=True)
|
| unlabeled_dataset = animal_dataset(self.root, transform=self.transform_train, mode='unlabeled', pred=pred,path=paths,
|
| probability=prob)
|
| unlabeled_loader = DataLoader(
|
| dataset=unlabeled_dataset,
|
| batch_size=int(self.batch_size),
|
| shuffle=True,
|
| num_workers=self.num_workers,
|
| pin_memory=True)
|
| return labeled_loader, unlabeled_loader
|
| elif mode == 'eval_train':
|
| eval_dataset = animal_dataset(self.root, transform=self.transform_test, mode='all')
|
| eval_loader = DataLoader(
|
| dataset=eval_dataset,
|
| batch_size=self.batch_size,
|
| shuffle=False,
|
| num_workers=self.num_workers,
|
| pin_memory=True)
|
| return eval_loader
|
| elif mode == 'test':
|
| test_dataset = animal_dataset(self.root, transform=self.transform_test, mode='test')
|
| test_loader = DataLoader(
|
| dataset=test_dataset,
|
| batch_size=1000,
|
| shuffle=False,
|
| num_workers=self.num_workers,
|
| pin_memory=True)
|
| return test_loader
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |