| import pickle |
| import shutil |
| import struct |
| from pathlib import Path |
|
|
| import samplelib.SampleLoader |
| from core.interact import interact as io |
| from samplelib import Sample |
| from core import pathex |
|
|
| packed_faceset_filename = 'faceset.pak' |
|
|
| class PackedFaceset(): |
| VERSION = 1 |
|
|
| @staticmethod |
| def pack(samples_path): |
| samples_dat_path = samples_path / packed_faceset_filename |
|
|
| if samples_dat_path.exists(): |
| io.log_info(f"{samples_dat_path} : file already exists !") |
| io.input("Press enter to continue and overwrite.") |
|
|
| as_person_faceset = False |
| dir_names = pathex.get_all_dir_names(samples_path) |
| if len(dir_names) != 0: |
| as_person_faceset = io.input_bool(f"{len(dir_names)} subdirectories found, process as person faceset?", True) |
|
|
| if as_person_faceset: |
| image_paths = [] |
|
|
| for dir_name in dir_names: |
| image_paths += pathex.get_image_paths(samples_path / dir_name) |
| else: |
| image_paths = pathex.get_image_paths(samples_path) |
|
|
| samples = samplelib.SampleLoader.load_face_samples(image_paths) |
| samples_len = len(samples) |
|
|
| samples_configs = [] |
| for sample in io.progress_bar_generator (samples, "Processing"): |
| sample_filepath = Path(sample.filename) |
| sample.filename = sample_filepath.name |
|
|
| if as_person_faceset: |
| sample.person_name = sample_filepath.parent.name |
| samples_configs.append ( sample.get_config() ) |
| samples_bytes = pickle.dumps(samples_configs, 4) |
|
|
| of = open(samples_dat_path, "wb") |
| of.write ( struct.pack ("Q", PackedFaceset.VERSION ) ) |
| of.write ( struct.pack ("Q", len(samples_bytes) ) ) |
| of.write ( samples_bytes ) |
|
|
| del samples_bytes |
| del samples_configs |
|
|
| sample_data_table_offset = of.tell() |
| of.write ( bytes( 8*(samples_len+1) ) ) |
|
|
| data_start_offset = of.tell() |
| offsets = [] |
|
|
| for sample in io.progress_bar_generator(samples, "Packing"): |
| try: |
| if sample.person_name is not None: |
| sample_path = samples_path / sample.person_name / sample.filename |
| else: |
| sample_path = samples_path / sample.filename |
|
|
|
|
| with open(sample_path, "rb") as f: |
| b = f.read() |
|
|
| offsets.append ( of.tell() - data_start_offset ) |
| of.write(b) |
| except: |
| raise Exception(f"error while processing sample {sample_path}") |
|
|
| offsets.append ( of.tell() ) |
|
|
| of.seek(sample_data_table_offset, 0) |
| for offset in offsets: |
| of.write ( struct.pack("Q", offset) ) |
| of.seek(0,2) |
| of.close() |
| |
| if io.input_bool(f"Delete original files?", True): |
| for filename in io.progress_bar_generator(image_paths, "Deleting files"): |
| Path(filename).unlink() |
|
|
| if as_person_faceset: |
| for dir_name in io.progress_bar_generator(dir_names, "Deleting dirs"): |
| dir_path = samples_path / dir_name |
| try: |
| shutil.rmtree(dir_path) |
| except: |
| io.log_info (f"unable to remove: {dir_path} ") |
|
|
| @staticmethod |
| def unpack(samples_path): |
| samples_dat_path = samples_path / packed_faceset_filename |
| if not samples_dat_path.exists(): |
| io.log_info(f"{samples_dat_path} : file not found.") |
| return |
|
|
| samples = PackedFaceset.load(samples_path) |
|
|
| for sample in io.progress_bar_generator(samples, "Unpacking"): |
| person_name = sample.person_name |
| if person_name is not None: |
| person_path = samples_path / person_name |
| person_path.mkdir(parents=True, exist_ok=True) |
|
|
| target_filepath = person_path / sample.filename |
| else: |
| target_filepath = samples_path / sample.filename |
|
|
| with open(target_filepath, "wb") as f: |
| f.write( sample.read_raw_file() ) |
|
|
| samples_dat_path.unlink() |
|
|
| @staticmethod |
| def path_contains(samples_path): |
| samples_dat_path = samples_path / packed_faceset_filename |
| return samples_dat_path.exists() |
| |
| @staticmethod |
| def load(samples_path): |
| samples_dat_path = samples_path / packed_faceset_filename |
| if not samples_dat_path.exists(): |
| return None |
|
|
| f = open(samples_dat_path, "rb") |
| version, = struct.unpack("Q", f.read(8) ) |
| if version != PackedFaceset.VERSION: |
| raise NotImplementedError |
|
|
| sizeof_samples_bytes, = struct.unpack("Q", f.read(8) ) |
|
|
| samples_configs = pickle.loads ( f.read(sizeof_samples_bytes) ) |
| samples = [] |
| for sample_config in samples_configs: |
| sample_config = pickle.loads(pickle.dumps (sample_config)) |
| samples.append ( Sample (**sample_config) ) |
|
|
| offsets = [ struct.unpack("Q", f.read(8) )[0] for _ in range(len(samples)+1) ] |
| data_start_offset = f.tell() |
| f.close() |
|
|
| for i, sample in enumerate(samples): |
| start_offset, end_offset = offsets[i], offsets[i+1] |
| sample.set_filename_offset_size( str(samples_dat_path), data_start_offset+start_offset, end_offset-start_offset ) |
|
|
| return samples |
|
|