| from torch import Tensor |
| import torch.distributed as dist |
| import torch |
| import torch.nn.functional as F |
| import os |
|
|
| class SimpleContrastiveLoss: |
| def __init__(self, temperature: float = 0.02, alpha: float = 0.05, weights=None): |
| """ |
| weights: list[float] or None |
| - 若提供 weights,则用于对多个视角/层的 CE 加权,长度需与视角数一致,训练时会归一化。 |
| - 若为 None 且 K==2,退化为 [alpha, 1-alpha] |
| - 若为 None 且 K>2,默认均匀权重 |
| """ |
| self.temperature = temperature |
| self.alpha = alpha |
| self.weights = weights |
|
|
| def _get_weights(self, K: int, device): |
| if self.weights is not None: |
| assert len(self.weights) == K, f"weights length {len(self.weights)} != K={K}" |
| w = torch.tensor(self.weights, dtype=torch.float32, device=device) |
| w = torch.clamp(w, min=0) |
| s = w.sum().item() |
| if s <= 0: |
| w = torch.ones(K, device=device) / K |
| else: |
| w = w / s |
| return w |
| if K == 2: |
| w = torch.tensor([self.alpha, 1.0 - self.alpha], dtype=torch.float32, device=device) |
| return torch.clamp(w, min=0) / max(w.sum().item(), 1e-8) |
| |
| return torch.ones(K, dtype=torch.float32, device=device) / K |
|
|
| def __call__(self, x: Tensor, y: Tensor, target: Tensor = None, reduction: str = 'mean') -> Tensor: |
| """ |
| 统一支持: |
| - x=[B, D], y=[B, D] -> 单视角 |
| - x=[B, K, D], y=[B, D] -> K 个 query 视角对单一候选视角 |
| - x=[B, D], y=[B, K, D] -> 单一 query 视角对 K 个候选视角 |
| - x=[B, K, D], y=[B, K, D] -> 逐视角配对(k↔k)加权 |
| """ |
| B = x.size(0) |
| if target is None: |
| target_per_qry = y.size(0) // B |
| target = torch.arange(0, B * target_per_qry, target_per_qry, device=x.device, dtype=torch.long) |
|
|
| |
| if x.dim() == 2 and y.dim() == 2: |
| logits = torch.matmul(x, y.transpose(0, 1)) / self.temperature |
| return F.cross_entropy(logits, target, reduction=reduction) |
|
|
| |
| if x.dim() == 3 and y.dim() == 2: |
| K = x.size(1) |
| w = self._get_weights(K, x.device) |
| loss = 0.0 |
| for k in range(K): |
| logits_k = torch.matmul(x[:, k, :], y.transpose(0, 1)) / self.temperature |
| loss_k = F.cross_entropy(logits_k, target, reduction=reduction) |
| loss = loss + w[k] * loss_k |
| return loss |
|
|
| |
| if x.dim() == 2 and y.dim() == 3: |
| K = y.size(1) |
| w = self._get_weights(K, x.device) |
| loss = 0.0 |
| for k in range(K): |
| logits_k = torch.matmul(x, y[:, k, :].transpose(0, 1)) / self.temperature |
| loss_k = F.cross_entropy(logits_k, target, reduction=reduction) |
| loss = loss + w[k] * loss_k |
| return loss |
|
|
| |
| if x.dim() == 3 and y.dim() == 3: |
| Kx, Ky = x.size(1), y.size(1) |
| assert Kx == Ky, f"view mismatch: {Kx} vs {Ky}" |
| K = Kx |
| w = self._get_weights(K, x.device) |
| loss = 0.0 |
| for k in range(K): |
| logits_k = torch.matmul(x[:, k, :], y[:, k, :].transpose(0, 1)) / self.temperature |
| loss_k = F.cross_entropy(logits_k, target, reduction=reduction) |
| loss = loss + w[k] * loss_k |
| return loss |
|
|
| raise ValueError(f"Unsupported shapes: x {tuple(x.size())}, y {tuple(y.size())}") |
|
|
|
|
| class DistributedContrastiveLoss(SimpleContrastiveLoss): |
| def __init__(self, n_target: int = 0, scale_loss: bool = True, temperature: float = 0.02, alpha: float = 0.05, weights=None): |
| assert dist.is_initialized(), "Distributed training has not been properly initialized." |
| super().__init__(temperature=temperature, alpha=alpha, weights=weights) |
| self.word_size = dist.get_world_size() |
| self.rank = dist.get_rank() |
| self.scale_loss = scale_loss |
|
|
| def __call__(self, x: Tensor, y: Tensor, **kwargs): |
| dist_x = self.gather_tensor(x) |
| dist_y = self.gather_tensor(y) |
| loss = super().__call__(dist_x, dist_y, **kwargs) |
| if self.scale_loss: |
| loss = loss * self.word_size |
| return loss |
|
|
| def gather_tensor(self, t): |
| gathered = [torch.empty_like(t) for _ in range(self.word_size)] |
| dist.all_gather(gathered, t) |
| gathered[self.rank] = t |
| return torch.cat(gathered, dim=0) |
|
|
| class InExampleContrastiveLoss: |
| """ |
| 保持不变 |
| x.shape=[bsz, hdim], y.shape=[bsz, num_label, hdim] |
| """ |
| def __init__(self, n_hard_negatives: int = 0, temperature: float = 1.0, ndim: int = None, *args, **kwargs): |
| self.target_per_qry = n_hard_negatives + 1 |
| self.temperature = temperature |
| self.ndim = ndim |
|
|
| def __call__(self, x: Tensor, y: Tensor, reduction: str = 'mean'): |
| if torch.distributed.is_initialized(): |
| x = dist_utils.dist_gather(x) |
| y = dist_utils.dist_gather(y) |
| bsz, ndim = x.size(0), x.size(1) |
| target = torch.zeros(bsz, dtype=torch.long, device=x.device) |
| if self.ndim: |
| ndim = self.ndim |
| x = x[:, :ndim] |
| y = y[:, :ndim] |
| logits = torch.einsum('bod,bsd->bs', x.view(bsz, 1, ndim), y.view(bsz, -1, ndim)) * self.temperature |
| preds = torch.argmax(logits, dim=-1) |
| loss = F.cross_entropy(logits, target, reduction=reduction) |
| loss_detail = {"logits": logits, "labels": target, "preds": preds} |
| return loss, loss_detail |