| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import numpy as np |
| import random |
| from torch.utils.data import DataLoader, Dataset |
| from collections import deque |
| from torchvision import datasets, transforms |
|
|
| |
| class PlasticLinear(nn.Module): |
| def __init__(self, in_features, out_features, plasticity_type="hebbian", learning_rate=0.01): |
| super().__init__() |
| self.in_features = in_features |
| self.out_features = out_features |
| self.weight = nn.Parameter(torch.randn(out_features, in_features) * 0.1) |
| self.bias = nn.Parameter(torch.zeros(out_features)) |
| self.plasticity_type = plasticity_type |
| self.eta = learning_rate |
| self.trace = torch.zeros(out_features, in_features) |
| self.register_buffer('prev_y', torch.zeros(out_features)) |
|
|
| def forward(self, x): |
| y = F.linear(x, self.weight, self.bias) |
| if self.training: |
| x_detached = x.detach() |
| y_detached = y.detach() |
| if self.plasticity_type == "hebbian": |
| hebb = torch.einsum('bi,bj->ij', y_detached, x_detached) / x.size(0) |
| self.trace = (1 - self.eta) * self.trace + self.eta * hebb |
| with torch.no_grad(): |
| self.weight += self.trace |
| elif self.plasticity_type == "stdp": |
| dy = y_detached - self.prev_y |
| stdp = torch.einsum('bi,bj->ij', dy, x_detached) / x.size(0) |
| self.trace = (1 - self.eta) * self.trace + self.eta * stdp |
| with torch.no_grad(): |
| self.weight += self.trace |
| self.prev_y = y_detached.clone() |
| return y |
|
|
| |
| class SpikeFunction(torch.autograd.Function): |
| @staticmethod |
| def forward(ctx, input): |
| ctx.save_for_backward(input) |
| return (input > 0).float() |
|
|
| @staticmethod |
| def backward(ctx, grad_output): |
| input, = ctx.saved_tensors |
| return grad_output * (abs(input) < 1).float() |
|
|
| spike_fn = SpikeFunction.apply |
|
|
| class LIFNeuron(nn.Module): |
| def __init__(self, tau=2.0): |
| super().__init__() |
| self.tau = tau |
| self.mem = 0 |
|
|
| def forward(self, x): |
| decay = torch.exp(torch.tensor(-1.0 / self.tau)) |
| self.mem = self.mem * decay + x |
| out = spike_fn(self.mem - 1.0) |
| self.mem = self.mem * (1.0 - out.detach()) |
| return out |
|
|
| |
| class AdaptiveLIF(nn.Module): |
| def __init__(self, size, tau=2.0, beta=0.2): |
| super().__init__() |
| self.size = size |
| self.tau = tau |
| self.beta = beta |
| self.mem = torch.zeros(size) |
| self.thresh = torch.ones(size) |
|
|
| def forward(self, x): |
| decay = torch.exp(torch.tensor(-1.0 / self.tau)) |
| self.mem = self.mem * decay + x |
| out = spike_fn(self.mem - self.thresh) |
| self.thresh = self.thresh + self.beta * out |
| self.mem = self.mem * (1.0 - out.detach()) |
| return out |
|
|
| |
| class RelayLayer(nn.Module): |
| def __init__(self, dim, heads=4): |
| super().__init__() |
| self.attn = nn.MultiheadAttention(embed_dim=dim, num_heads=heads, batch_first=True) |
| self.lif = LIFNeuron() |
|
|
| def forward(self, x): |
| attn_out, _ = self.attn(x, x, x) |
| return self.lif(attn_out) |
|
|
| |
| class WorkingMemory(nn.Module): |
| def __init__(self, input_dim, hidden_dim): |
| super().__init__() |
| self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True) |
|
|
| def forward(self, x): |
| out, _ = self.lstm(x) |
| return out[:, -1] |
|
|
| |
| class PlaceGrid(nn.Module): |
| def __init__(self, grid_size=10, embedding_dim=64): |
| super().__init__() |
| self.embedding = nn.Embedding(grid_size**2, embedding_dim) |
|
|
| def forward(self, index): |
| return self.embedding(index) |
|
|
| |
| class MirrorComparator(nn.Module): |
| def __init__(self, dim): |
| super().__init__() |
| self.cos = nn.CosineSimilarity(dim=1) |
|
|
| def forward(self, x1, x2): |
| return self.cos(x1, x2).unsqueeze(1) |
|
|
| |
| class NeuroendocrineModulator(nn.Module): |
| def __init__(self, input_dim, hidden_dim): |
| super().__init__() |
| self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True) |
|
|
| def forward(self, x): |
| out, _ = self.lstm(x) |
| return out[:, -1] |
|
|
| |
| class AutonomicFeedback(nn.Module): |
| def __init__(self, input_dim): |
| super().__init__() |
| self.feedback = nn.Linear(input_dim, input_dim) |
|
|
| def forward(self, x): |
| return torch.tanh(self.feedback(x)) |
|
|
| |
| class ReplayBuffer: |
| def __init__(self, capacity=1000): |
| self.buffer = deque(maxlen=capacity) |
|
|
| def add(self, inputs, labels, task): |
| self.buffer.append((inputs, labels, task)) |
|
|
| def sample(self, batch_size): |
| indices = random.sample(range(len(self.buffer)), batch_size) |
| batch = [self.buffer[i] for i in indices] |
| inputs, labels, tasks = zip(*batch) |
| return inputs, labels, tasks |
|
|
| |
| class ModularBrainAgent(nn.Module): |
| def __init__(self, input_dims, hidden_dim, output_dims): |
| super().__init__() |
| self.vision_encoder = nn.Linear(input_dims['vision'], hidden_dim) |
| self.language_encoder = nn.Linear(input_dims['language'], hidden_dim) |
| self.numeric_encoder = nn.Linear(input_dims['numeric'], hidden_dim) |
|
|
| |
| self.connect_sensory_to_relay = PlasticLinear(hidden_dim * 3, hidden_dim, plasticity_type='hebbian') |
| self.relay_layer = RelayLayer(hidden_dim) |
| self.connect_relay_to_inter = PlasticLinear(hidden_dim, hidden_dim, plasticity_type='stdp') |
|
|
| self.interneuron = AdaptiveLIF(hidden_dim) |
| self.memory = WorkingMemory(hidden_dim, hidden_dim) |
| self.place = PlaceGrid(grid_size=10, embedding_dim=hidden_dim) |
| self.comparator = MirrorComparator(hidden_dim) |
| self.emotion = NeuroendocrineModulator(hidden_dim, hidden_dim) |
| self.feedback = AutonomicFeedback(hidden_dim) |
|
|
| self.task_heads = nn.ModuleDict({ |
| task: nn.Linear(hidden_dim, out_dim) |
| for task, out_dim in output_dims.items() |
| }) |
|
|
| self.replay = ReplayBuffer() |
|
|
| def forward(self, inputs, task, position_idx=None): |
| v = self.vision_encoder(inputs['vision']) |
| l = self.language_encoder(inputs['language']) |
| n = self.numeric_encoder(inputs['numeric']) |
|
|
| sensory_cat = torch.cat([v, l, n], dim=-1) |
| z = self.connect_sensory_to_relay(sensory_cat) |
|
|
| z = self.relay_layer(z.unsqueeze(1)).squeeze(1) |
| z = self.connect_relay_to_inter(z) |
| z = self.interneuron(z) |
|
|
| m = self.memory(z.unsqueeze(1)) |
| p = self.place(position_idx if position_idx is not None else torch.tensor([0])) |
| e = self.emotion(z.unsqueeze(1)) |
| f = self.feedback(z) |
|
|
| combined = z + m + p + e + f |
| out = self.task_heads[task](combined) |
| return out |
|
|
| def remember(self, inputs, labels, task): |
| self.replay.add(inputs, labels, task) |
|
|
| |
| if __name__ == "__main__": |
| input_dims = {'vision': 32, 'language': 16, 'numeric': 8} |
| output_dims = {'classification': 5, 'regression': 1, 'binary': 1} |
| agent = ModularBrainAgent(input_dims, hidden_dim=64, output_dims=output_dims) |
|
|
| tasks = list(output_dims.keys()) |
|
|
| for step in range(250): |
| task = random.choice(tasks) |
| inputs = { |
| 'vision': torch.randn(1, 32), |
| 'language': torch.randn(1, 16), |
| 'numeric': torch.randn(1, 8) |
| } |
| labels = torch.randint(0, output_dims[task], (1,)) if task == 'classification' else torch.randn(1, output_dims[task]) |
| output = agent(inputs, task) |
| loss = F.cross_entropy(output, labels) if task == 'classification' else F.mse_loss(output, labels) |
| print(f"Step {step:02d} | Task: {task:13s} | Loss: {loss.item():.4f}") |