| import torch |
| import torch.nn as nn |
| import math |
| from ultralytics import YOLO |
| from ultralytics.nn.modules import Conv, Concat |
| from lib.models.common import Focus, BottleneckCSP, Detect |
| from lib.utils import check_anchor_order |
| import logging |
|
|
| class YOLOv11Backbone(nn.Module): |
| def __init__(self, width_multiple=0.25, depth_multiple=0.50, yolo_model_path=None): |
| """ |
| YOLOv11 Backbone - 直接从 ultralytics YOLO 模型提取 |
| |
| Args: |
| width_multiple: 通道数缩放因子 (n=0.25, s=0.50, m=1.00, l=1.00, x=1.50) |
| depth_multiple: 深度缩放因子 (n=0.50, s=0.50, m=0.50, l=1.00, x=1.00) |
| yolo_model_path: YOLOv11 预训练模型路径(可选) |
| |
| Warning: |
| 不同的yolo model(n, s, m, l, x)模型结构都会不同,目前这个是以 small 为例, |
| 恰好可以输出(128, 256, 512)通道数 (虽然有adapter也无所谓) |
| """ |
| super().__init__() |
|
|
| self.out_indices = [4, 6, 10] |
| |
| |
| if yolo_model_path: |
| yolo = YOLO(yolo_model_path) |
| yolo_model = yolo.model |
| |
| |
| self.layers = nn.ModuleList([yolo_model.model[i] for i in range(11)]) |
| |
| |
| self.out_channels = [ |
| yolo_model.model[self.out_indices[0]].conv.out_channels, |
| yolo_model.model[self.out_indices[1]].conv.out_channels, |
| yolo_model.model[self.out_indices[2]].conv.out_channels, |
| ] |
| else: |
| |
| from ultralytics.nn.modules import Conv, C3k2, SPPF, C2PSA |
| |
| |
| def make_divisible(x, divisor=8): |
| """确保通道数是 divisor 的倍数""" |
| return int(math.ceil(x / divisor) * divisor) |
| |
| c1 = make_divisible(64 * width_multiple) |
| c2 = make_divisible(128 * width_multiple) |
| c3 = make_divisible(256 * width_multiple) |
| c4 = make_divisible(512 * width_multiple) |
| c5 = make_divisible(1024 * width_multiple) |
| |
| |
| n1 = max(round(2 * depth_multiple), 1) |
| |
| self.layers = nn.ModuleList([ |
| Conv(3, c1, k=3, s=2), |
| Conv(c1, c2, k=3, s=2), |
| C3k2(c2, c3, n=n1, shortcut=False, e=0.25), |
| Conv(c3, c3, k=3, s=2), |
| C3k2(c3, c4, n=n1, shortcut=False, e=0.25), |
| Conv(c4, c4, k=3, s=2), |
| C3k2(c4, c4, n=n1, shortcut=True), |
| Conv(c4, c5, k=3, s=2), |
| C3k2(c5, c5, n=n1, shortcut=True), |
| SPPF(c5, c5, k=5), |
| C2PSA(c5, c5, n=n1), |
| ]) |
| self.out_channels = [] |
| for i in self.out_indices: |
| layer = self.layers[i] |
| |
| if hasattr(layer, 'conv'): |
| self.out_channels.append(layer.conv.out_channels) |
| elif hasattr(layer, 'cv2'): |
| self.out_channels.append(layer.cv2.conv.out_channels) |
| else: |
| raise AttributeError(f"Layer {i} 没有 conv 或 cv2 属性,请检查模块结构") |
| |
| def forward(self, x): |
| outputs = [] |
| for i, layer in enumerate(self.layers): |
| x = layer(x) |
| if i in self.out_indices: |
| outputs.append(x) |
| return outputs |
|
|
| class ChannelAdapter(nn.Module): |
| def __init__(self, in_channels, out_channels): |
| super().__init__() |
| self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1) |
| |
| def forward(self, x): |
| return self.conv(x) |
|
|
| class YOLOPWithYOLOv11(nn.Module): |
|
|
| def __init__(self, num_seg_class=2, yolo_scale='n', yolo_weights_path=None): |
| """ |
| YOLOP with YOLOv11 Backbone |
| |
| Args: |
| num_seg_class: 分割类别数 |
| yolo_scale: YOLOv11 规模 ('n', 's', 'm', 'l', 'x') |
| yolo_weights_path: YOLOv11 预训练权重路径(可选) |
| """ |
| super().__init__() |
| |
| |
| scale_configs = { |
| 'n': {'width': 0.25, 'depth': 0.50}, |
| 's': {'width': 0.50, 'depth': 0.50}, |
| 'm': {'width': 1.00, 'depth': 0.50}, |
| 'l': {'width': 1.00, 'depth': 1.00}, |
| 'x': {'width': 1.50, 'depth': 1.00}, |
| } |
| |
| if yolo_scale not in scale_configs: |
| raise ValueError(f"Invalid yolo_scale: {yolo_scale}. Must be one of {list(scale_configs.keys())}") |
| |
| scale = scale_configs[yolo_scale] |
| |
| |
| if yolo_weights_path: |
| self.backbone = YOLOv11Backbone(yolo_model_path=yolo_weights_path) |
| else: |
| self.backbone = YOLOv11Backbone(width_multiple=scale['width'], depth_multiple=scale['depth']) |
| |
| |
| backbone_channels = self.backbone.out_channels |
| neck_channels = [128, 256, 512] |
| |
| self.adapters = nn.ModuleList([ |
| ChannelAdapter(backbone_channels[0], neck_channels[0]), |
| ChannelAdapter(backbone_channels[1], neck_channels[1]), |
| ChannelAdapter(backbone_channels[2], neck_channels[2]), |
| ]) |
| |
| self.neck = nn.ModuleList([ |
| Conv(512, 256, k=1, s=1), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| Concat(dimension=1), |
| BottleneckCSP(512, 256, n=1, shortcut=False), |
| Conv(256, 128, k=1, s=1), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| Concat(dimension=1), |
| BottleneckCSP(256, 128, n=1, shortcut=False), |
| Conv(128, 128, k=3, s=2), |
| Concat(dimension=1), |
| BottleneckCSP(256, 256, n=1, shortcut=False), |
| Conv(256, 256, k=3, s=2), |
| Concat(dimension=1), |
| BottleneckCSP(512, 512, n=1, shortcut=False), |
| ]) |
| |
| self.detect_head = Detect(1, [[3,9,5,11,4,20], [7,18,6,39,12,31], [19,50,38,81,68,157]], [128, 256, 512]) |
|
|
| self.drivable_seg_head = nn.ModuleList([ |
| Conv(256, 128, k=3, s=1), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| BottleneckCSP(128, 64, n=1, shortcut=False), |
| Conv(64, 32, k=3, s=1), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| Conv(32, 16, k=3, s=1), |
| BottleneckCSP(16, 8, n=1, shortcut=False), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| Conv(8, num_seg_class, k=3, s=1), |
| ]) |
| self.lane_seg_head = nn.ModuleList([ |
| Conv(256, 128, k=3, s=1), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| BottleneckCSP(128, 64, n=1, shortcut=False), |
| Conv(64, 32, k=3, s=1), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| Conv(32, 16, k=3, s=1), |
| BottleneckCSP(16, 8, n=1, shortcut=False), |
| nn.Upsample(scale_factor=2, mode='nearest'), |
| Conv(8, 2, k=3, s=1), |
| ]) |
|
|
| |
| |
| |
| |
| s = 128 |
| with torch.no_grad(): |
| dummy = torch.zeros(1, 3, s, s) |
| detect_out, _, _ = self.forward(dummy) |
| self.detect_head.stride = torch.tensor([s / x.shape[-2] for x in detect_out]) |
| self.detect_head.anchors /= self.detect_head.stride.view(-1, 1, 1) |
| check_anchor_order(self.detect_head) |
| self.stride = self.detect_head.stride |
|
|
| print(f"Initialized Detect head with strides: {self.detect_head.stride.tolist()}") |
| |
| |
| self.nc = 1 |
| self.detector_index = -1 |
| self.names = ['vehicle'] |
| self.model = nn.ModuleList([ |
| self.backbone, |
| self.adapters, |
| self.neck, |
| self.detect_head, |
| self.drivable_seg_head, |
| self.lane_seg_head |
| ]) |
| self.detector_index = 3 |
| self.det_out_idx = 25 |
|
|
| self.gr = 1.0 |
| |
| |
| self._initialize_biases() |
| |
| def freeze_backbone(self): |
| """冻结backbone和adapters的参数""" |
| logging.info("Freezing backbone parameters...") |
| for param in self.backbone.parameters(): |
| param.requires_grad = False |
| for param in self.adapters.parameters(): |
| param.requires_grad = False |
| |
| |
| frozen_count = sum(1 for p in self.backbone.parameters() if not p.requires_grad) |
| frozen_count += sum(1 for p in self.adapters.parameters() if not p.requires_grad) |
| total_count = sum(1 for _ in self.backbone.parameters()) |
| total_count += sum(1 for _ in self.adapters.parameters()) |
| logging.info(f"Frozen {frozen_count}/{total_count} backbone+adapter parameters") |
| |
| def unfreeze_backbone(self): |
| """解冻backbone和adapters的参数""" |
| logging.info("Unfreezing backbone parameters...") |
| for param in self.backbone.parameters(): |
| param.requires_grad = True |
| for param in self.adapters.parameters(): |
| param.requires_grad = True |
| |
| def _initialize_biases(self, cf=None): |
| """初始化检测头的偏置 (参考原始YOLOP实现)""" |
| |
| m = self.detect_head |
| for mi, s in zip(m.m, m.stride): |
| b = mi.bias.view(m.na, -1) |
| b.data[:, 4] += math.log(8 / (640 / s) ** 2) |
| b.data[:, 5:] += math.log(0.6 / (m.nc - 0.99)) if cf is None else torch.log(cf / cf.sum()) |
| mi.bias = torch.nn.Parameter(b.view(-1), requires_grad=True) |
| |
| def load_yolov11_backbone_weights(self, weights_path, freeze_backbone=False): |
| """ |
| 从YOLOv11预训练模型加载backbone权重 |
| |
| Args: |
| weights_path: YOLOv11权重路径(.pt文件) |
| freeze_backbone: 是否冻结backbone参数 |
| """ |
| try: |
| from ultralytics import YOLO |
| logging.info(f"Loading YOLOv11 weights from {weights_path}") |
| |
| |
| yolo_model = YOLO(weights_path) |
| yolo_state_dict = yolo_model.model.state_dict() |
| |
| |
| |
| backbone_mapping = { |
| |
| 'model.0': 'backbone.layers.0', |
| 'model.1': 'backbone.layers.1', |
| 'model.2': 'backbone.layers.2', |
| 'model.3': 'backbone.layers.3', |
| 'model.4': 'backbone.layers.4', |
| 'model.5': 'backbone.layers.5', |
| 'model.6': 'backbone.layers.6', |
| 'model.7': 'backbone.layers.7', |
| 'model.8': 'backbone.layers.8', |
| 'model.9': 'backbone.layers.9', |
| 'model.10': 'backbone.layers.10', |
| } |
| |
| |
| new_state_dict = {} |
| loaded_keys = [] |
| for yolo_key, our_key in backbone_mapping.items(): |
| for k, v in yolo_state_dict.items(): |
| if k.startswith(yolo_key + '.'): |
| new_key = k.replace(yolo_key, our_key) |
| new_state_dict[new_key] = v |
| loaded_keys.append(new_key) |
| |
| |
| model_dict = self.state_dict() |
| |
| new_state_dict = {k: v for k, v in new_state_dict.items() if k in model_dict} |
| model_dict.update(new_state_dict) |
| self.load_state_dict(model_dict) |
| |
| logging.info(f"Successfully loaded {len(loaded_keys)} backbone parameters from YOLOv11") |
| |
| |
| if freeze_backbone: |
| self.freeze_backbone() |
| logging.info("Backbone frozen successfully") |
| |
| except Exception as e: |
| logging.warning(f"Failed to load YOLOv11 weights: {e}") |
| logging.warning("Training will start from scratch") |
| |
| def forward(self, x): |
| features = self.backbone(x) |
| features = [adapter(f) for adapter, f in zip(self.adapters, features)] |
| |
| x = features[-1] |
| x = self.neck[0](x) |
| x = self.neck[1](x) |
| x = self.neck[2]([x, features[1]]) |
| x = self.neck[3](x) |
| x = self.neck[4](x) |
| x = self.neck[5](x) |
| p3_fpn = self.neck[6]([x, features[0]]) |
| p3 = self.neck[7](p3_fpn) |
| x = self.neck[8](p3) |
| x = self.neck[9]([x, self.neck[4](features[1])]) |
| p4 = self.neck[10](x) |
| x = self.neck[11](p4) |
| x = self.neck[12]([x, self.neck[0](features[2])]) |
| p5 = self.neck[13](x) |
| |
| detect_out = self.detect_head([p3, p4, p5]) |
| drivable_out = p3_fpn |
| for layer in self.drivable_seg_head: |
| drivable_out = layer(drivable_out) |
|
|
| lane_out = p3_fpn |
| for layer in self.lane_seg_head: |
| lane_out = layer(lane_out) |
|
|
| drivable_out = torch.sigmoid(drivable_out) |
| lane_out = torch.sigmoid(lane_out) |
|
|
| return [detect_out, drivable_out, lane_out] |
|
|
|
|
| def get_net_yolov11(cfg, **kwargs): |
| """ |
| 获取带有YOLOv11 backbone的YOLOP模型 |
| |
| Args: |
| cfg: 配置对象 |
| **kwargs: 其他参数,包括: |
| - yolov11_weights: YOLOv11预训练权重路径 |
| - freeze_backbone: 是否冻结backbone |
| - yolo_scale: YOLOv11规模 ('n', 's', 'm', 'l', 'x') |
| """ |
| num_seg_class = cfg.num_seg_class if hasattr(cfg, 'num_seg_class') else 2 |
| yolo_scale = kwargs.get('yolo_scale', 'n') |
| |
| |
| yolov11_weights = kwargs.get('yolov11_weights', f'weights/yolo11{yolo_scale}.pt') |
| freeze_backbone = kwargs.get('freeze_backbone', False) |
| |
| |
| import os |
| if os.path.exists(yolov11_weights): |
| logging.info(f"Creating model with YOLOv11{yolo_scale} pretrained weights from {yolov11_weights}") |
| model = YOLOPWithYOLOv11(num_seg_class=num_seg_class, yolo_scale=yolo_scale, yolo_weights_path=yolov11_weights) |
| if freeze_backbone: |
| model.freeze_backbone() |
| else: |
| logging.warning(f"YOLOv11 weights not found at {yolov11_weights}, creating model from scratch") |
| model = YOLOPWithYOLOv11(num_seg_class=num_seg_class, yolo_scale=yolo_scale, yolo_weights_path=None) |
| |
| return model |