| import os |
| import torch |
| import random |
| from bitstring import BitArray |
|
|
| |
|
|
|
|
|
|
| def generate_file_with_bits(file_path, num_bits): |
| """ |
| 根据需要多少bit,随机生成对应大小的恶意软件 |
| :param file_path: |
| :param num_bits: |
| :return: |
| """ |
| |
| num_bytes = (num_bits + 7) // 8 |
| print("Byte Num:", num_bytes) |
|
|
| |
| byte_array = bytearray(random.getrandbits(8) for _ in range(num_bytes)) |
|
|
| |
| if num_bits % 8 != 0: |
| last_byte_bits = num_bits % 8 |
| |
| mask = (1 << last_byte_bits) - 1 |
| byte_array[-1] &= mask |
|
|
| |
| with open(file_path, 'wb') as f: |
| f.write(byte_array) |
|
|
| print(f"File '{file_path}' generated with {num_bits} bits.") |
|
|
|
|
|
|
|
|
| class swin: |
| def __init__(self, path): |
| """ |
| 初始化使用参数的路径进行初始化 |
| :param path: |
| """ |
| self.path = path |
| return |
|
|
|
|
| def get_pth_keys(self): |
| """ |
| 返回参数的key |
| :param paraPath: 待获得的参数pth |
| :return: |
| """ |
| return torch.load(self.path, map_location=torch.device("cpu")).keys() |
|
|
|
|
| def get_pth_keys_float32(self): |
| """ |
| 返回参数的key |
| :param paraPath: 待获得的参数pth |
| :return: |
| """ |
| para = torch.load(self.path, map_location=torch.device("cpu")) |
| temp = para.keys() |
| layers = [] |
| for i in temp: |
| if para[i].data.dtype == torch.float32: |
| layers.append(i) |
| return layers |
|
|
|
|
| def get_file_bit_num(self): |
| """ |
| 通过文件路径,获得文件bit数 |
| :return: bit size |
| """ |
| return os.path.getSize(self.path) * 8 |
|
|
|
|
| def layer_low_n_bit_fLip(self, flip_path, bit_n, *layers): |
| """ |
| 翻转pth的layers层的低n bit |
| :param flip_path: 翻转之后的参数pth |
| :param bit_n: 翻转低多少bit |
| :return: void |
| """ |
| para = torch.load(self.path) |
| mask= (1<<bit_n)-1 |
| for layer in layers: |
| if len(para[layer].data.shape) < 1: |
| continue |
| layer_tensor = para[layer].data |
| |
| if layer_tensor.dtype == torch.float32: |
| |
| layer_tensor_view = layer_tensor.view(torch.int32) |
| layer_tensor_view ^= mask |
| para[layer].data = layer_tensor_view.view(torch.float32) |
| torch.save(para, flip_path) |
|
|
|
|
| def all_layers_low_n_bit_fLip(self, flip_path, bit_n): |
| """ |
| 翻转所有层的低n bit,需要满足其中的数据类型是fp32 |
| :param flip_path: 翻转之后的参数pth |
| :param bit_n: 翻转低多少bit |
| :return: |
| """ |
| self.layer_low_n_bit_fLip(flip_path, bit_n, *self.get_pth_keys_float32()) |
|
|
|
|
| def get_layers_low_n_bit_size(self, layers, bit_n): |
| """ |
| usage: |
| size, size_list = agent.get_layers_low_n_bit_size(agent.get_pth_keys(), 16) |
| 返回指数部分最大的嵌入容量,单位是字节bit |
| :param layers: list |
| :param interval: 每interval个中嵌入一个 |
| :return: 总大小,每一层的大小 |
| """ |
| para = torch.load(self.path, map_location=torch.device("cpu")) |
| size_with_layers_list = [] |
| total_size = 0 |
| for layer in layers: |
|
|
| if para[layer].data.dtype != torch.float32: |
| continue |
|
|
| paraTensor = para[layer].data |
| paraTensor_flat = paraTensor.flatten() |
| layerSize = len(paraTensor_flat) * bit_n |
| size_with_layers_list.append(layerSize) |
| total_size += layerSize |
| return total_size, size_with_layers_list |
|
|
|
|
|
|
| def all_layers_low_n_bit_inject(self, inject_path, bit_n, malware, malware_len): |
| """ |
| 随机生成一个软件、在所有层的低nbit进行嵌入 |
| :param inject_path: 最后的嵌入pth文件 |
| :param bit_n: 低nbit |
| :param malware: 需要嵌入的恶意软件 |
| :param malware_len: 需要嵌入的恶意软件的长度,单位为bit |
| :return: |
| """ |
| paras = torch.load(self.path, map_location=torch.device("cpu")) |
| malware_str = BitArray(filename=malware).bin |
| mal_index = 0 |
| _, size_list = self.get_layers_low_n_bit_size(self.get_pth_keys_float32(), bit_n) |
| for layer, size in zip(self.get_pth_keys_float32(), size_list): |
| if paras[layer].data.dtype != torch.float32: |
| continue |
| print(layer, size) |
| para_tensor_flat = paras[layer].flatten() |
| |
| print(mal_index, mal_index + size) |
| print(para_tensor_flat.size(), size//bit_n) |
| para_index = 0 |
| for inject_pos in range(mal_index, min(mal_index + size, malware_len), bit_n): |
| current_write_content = malware_str[inject_pos: inject_pos + bit_n] |
| para_tensor_flat_str = BitArray(int=para_tensor_flat[para_index].view(torch.int32), length=32).bin |
| new_para_tensor_flat_str = para_tensor_flat_str[:32 - bit_n] + current_write_content |
|
|
| if int(new_para_tensor_flat_str, 2) >= 2 ** 31: |
| newParaInt = torch.tensor(int(new_para_tensor_flat_str, 2) - 2 ** 32, dtype=torch.int32) |
| para_tensor_flat[para_index] = newParaInt.view(torch.float32) |
| else: |
| newParaInt = torch.tensor(int(new_para_tensor_flat_str, 2), dtype=torch.int32) |
| para_tensor_flat[para_index] = newParaInt.view(torch.float32) |
| para_index += 1 |
|
|
| if mal_index + size >= malware_len: |
| break |
| else: |
| mal_index = mal_index + size |
| paras[layer] = para_tensor_flat.reshape(paras[layer].data.shape) |
| torch.save(paras, inject_path) |
| return |
|
|
|
|
| def all_layers_low_n_bit_extract(self, inject_path, bit_n, extract_malware, malware_len): |
| """ |
| :param malware_len: 需要嵌入的恶意软件的长度,单位为bit |
| :param inject_path: 嵌入后的模型参数路径 |
| :param bit_n: 嵌入参数的后nbit |
| :param extract_malware: 提取出来的恶意软件路径 |
| :param malware_len: 需要嵌入的恶意软件的长度,单位为bit |
| :return: |
| """ |
| paras = torch.load(inject_path, map_location="cpu"); bits, idx = BitArray(), 0; |
| _, size_list = self.get_layers_low_n_bit_size(self.get_pth_keys_float32(), bit_n) |
| for layer, _ in zip(self.get_pth_keys_float32(), size_list): |
| p = paras[layer].data |
| if p.dtype != torch.float32: continue |
| for x in p.flatten()[:min(len(p.flatten()), (malware_len - idx + bit_n - 1) // bit_n)]: |
| bits.append(f'0b{BitArray(int=int(x.view(torch.int32)), length=32).bin[-bit_n:]}'); |
| idx += bit_n |
| if idx >= malware_len: break |
| if idx >= malware_len: break |
| with open(extract_malware, 'wb') as f: |
| bits_clip = bits[:(malware_len-(malware_len%bit_n))] + bits[-(malware_len%bit_n):] |
| bits_clip[:malware_len].tofile(f) |
| return |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| if __name__ == "__main__": |
| |
| |
| |
| path = "../parameters/classification/swin_face/swin_face.pth" |
| |
| inject_path = "../parametersProcess/swin_face/swin_evilfiles_16.pth" |
| malware = "../malwares/generated_malware" |
| extract_malware = "../malwares/generated_malware_extracted" |
| agent = swin(path) |
|
|
|
|
| |
| |
| |
|
|
| size, size_list = agent.get_layers_low_n_bit_size(agent.get_pth_keys_float32(), 16) |
| |
| |
| |
| '''随机生成一个恶意软件,全部嵌入模型的层(简化流程)''' |
| generate_file_with_bits(malware, size) |
|
|
|
|
| print("malware bit size: ",os.path.getsize(malware) * 8) |
|
|
|
|
| '''嵌入''' |
| agent.all_layers_low_n_bit_inject(inject_path, 20, malware, os.path.getsize(malware) * 8) |
|
|
| '''提取''' |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| agent.all_layers_low_n_bit_extract(inject_path, 20, extract_malware, os.path.getsize(malware) * 8) |
| |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
|
|
|
|
|
|
|
|
|
|
| |