| import tensorflow as tf
|
| import numpy as np
|
| import faiss
|
|
|
| class MultiModalTransformer(tf.keras.Model):
|
| def __init__(self, hparams, knowledge_base, n_hash=1024, n_quant=256):
|
| super(MultiModalTransformer, self).__init__()
|
| self.hparams = hparams
|
| self.n_hash = n_hash
|
| self.n_quant = n_quant
|
|
|
|
|
| self.wte = tf.keras.layers.Embedding(hparams.n_vocab, hparams.n_embd)
|
| self.wpe = tf.keras.layers.Embedding(hparams.n_ctx, hparams.n_embd)
|
| self.hash_layer = tf.keras.layers.Dense(n_hash, activation='relu')
|
| self.quant_layer = tf.keras.layers.Dense(n_quant, activation='relu')
|
| self.h = [TransformerBlock(hparams.n_embd, hparams.n_head) for _ in range(hparams.n_layer)]
|
| self.ln_f = tf.keras.layers.LayerNormalization(epsilon=1e-5)
|
| self.fc = tf.keras.layers.Dense(hparams.n_vocab, use_bias=False)
|
|
|
|
|
| self.audio_encoder = tf.keras.Sequential([
|
| tf.keras.layers.Conv1D(256, kernel_size=11, strides=2, padding='same', activation='relu'),
|
| tf.keras.layers.Conv1D(256, kernel_size=11, strides=2, padding='same', activation='relu'),
|
| tf.keras.layers.Conv1D(256, kernel_size=11, strides=2, padding='same', activation='relu'),
|
| tf.keras.layers.GlobalAveragePooling1D(),
|
| tf.keras.layers.Dense(hparams.n_embd)
|
| ])
|
|
|
|
|
| self.image_encoder = tf.keras.applications.ResNet50(include_top=False, weights='imagenet')
|
| self.image_proj = tf.keras.layers.Dense(hparams.n_embd)
|
|
|
|
|
| self.pitch_embedding = tf.keras.layers.Embedding(128, hparams.n_embd)
|
| self.duration_embedding = tf.keras.layers.Embedding(32, hparams.n_embd)
|
| self.velocity_embedding = tf.keras.layers.Embedding(128, hparams.n_embd)
|
|
|
|
|
| self.anomaly_threshold = tf.Variable(0.5, trainable=False)
|
|
|
|
|
| self.knowledge_base = knowledge_base
|
| self.retriever = FAISSRetriever(knowledge_base)
|
| self.query_encoder = tf.keras.Sequential([
|
| tf.keras.layers.Dense(hparams.n_embd, activation='relu'),
|
| tf.keras.layers.Dense(hparams.n_embd)
|
| ])
|
|
|
|
|
| self.speech_output = tf.keras.layers.Dense(hparams.n_vocab)
|
| self.caption_output = tf.keras.layers.Dense(hparams.n_vocab)
|
| self.music_output = tf.keras.layers.Dense(288)
|
| self.anomaly_output = tf.keras.layers.Dense(1, activation='sigmoid')
|
|
|
|
|
| self.conversation_history = []
|
|
|
|
|
| self.personality_traits = {
|
| 'kindness': 0.9,
|
| 'honesty': 0.9,
|
| 'resilience': 0.8,
|
| 'open_mindedness': 0.8,
|
| 'empathy': 0.9,
|
| 'reliability': 0.9,
|
| 'humility': 0.8,
|
| 'positivity': 0.9,
|
| 'courage': 0.8,
|
| 'curiosity': 0.9,
|
| 'humor': 0.8,
|
| 'self_discipline': 0.8,
|
| 'emotional_stability': 0.8,
|
| 'assertiveness': 0.8,
|
| 'creativity': 0.9
|
| }
|
|
|
| def call(self, inputs, task):
|
| if task == 'speech_recognition':
|
| x = self.audio_encoder(inputs)
|
| elif task == 'image_captioning':
|
| image, text = inputs
|
| image_features = self.image_encoder(image)
|
| image_features = self.image_proj(tf.keras.layers.GlobalAveragePooling2D()(image_features))
|
| x = tf.concat([image_features[:, tf.newaxis, :], self.wte(text)], axis=1)
|
| elif task == 'music_generation':
|
| pitch, duration, velocity = inputs
|
| x = self.pitch_embedding(pitch) + self.duration_embedding(duration) + self.velocity_embedding(velocity)
|
| elif task in ['text_generation', 'anomaly_detection']:
|
| x = self.wte(inputs)
|
| else:
|
| raise ValueError(f"Unknown task: {task}")
|
|
|
|
|
| if task in ['text_generation', 'image_captioning']:
|
| query = x[:, 0, :]
|
| encoded_query = self.query_encoder(query)
|
| retrieved_docs = self.retriever.retrieve(encoded_query)
|
| x = tf.concat([x, self.wte(retrieved_docs)], axis=1)
|
|
|
|
|
| position = tf.range(0, x.shape[1], dtype=tf.int32)[tf.newaxis, :]
|
| x = x + self.wpe(position)
|
|
|
|
|
| x = self.hash_layer(x)
|
| x = self.quant_layer(x)
|
| for layer in self.h:
|
| x, _ = layer(x)
|
| x = self.ln_f(x)
|
|
|
|
|
| if task == 'speech_recognition':
|
| return self.speech_output(x)
|
| elif task == 'image_captioning':
|
| return self.caption_output(x)
|
| elif task == 'music_generation':
|
| return self.music_output(x)
|
| elif task == 'anomaly_detection':
|
| reconstruction = self.fc(x)
|
| reconstruction_loss = tf.reduce_mean(tf.square(inputs - reconstruction), axis=-1)
|
| anomaly_scores = tf.where(reconstruction_loss > self.anomaly_threshold, 1.0, 0.0)
|
| return reconstruction, anomaly_scores
|
| else:
|
| return self.fc(x)
|
|
|
| def pipe(self, inputs, task):
|
| if task == 'speech_recognition':
|
| return self.call(inputs, task)
|
| elif task == 'image_captioning':
|
| return self.call(inputs, task)
|
| elif task == 'music_generation':
|
| return self.call(inputs, task)
|
| elif task == 'text_generation':
|
| return self.call(inputs, task)
|
| elif task == 'anomaly_detection':
|
| return self.call(inputs, task)
|
| else:
|
| raise ValueError(f"Unknown task: {task}")
|
|
|
| def conversation(self, user_input):
|
|
|
| self.conversation_history.append(user_input)
|
|
|
|
|
| response = self.generate_response(self.conversation_history)
|
|
|
|
|
| self.conversation_history.append(response)
|
|
|
| return response
|
|
|
| def generate_response(self, conversation_history):
|
|
|
| conversation_input = tf.concat(conversation_history, axis=0)
|
|
|
|
|
| response = self.pipe(conversation_input, task='text_generation')
|
|
|
|
|
| response = self.apply_personality_traits(response)
|
|
|
| return response
|
|
|
| def apply_personality_traits(self, response):
|
|
|
| for trait, value in self.personality_traits.items():
|
| if trait == 'kindness':
|
| response = self.add_kindness(response, value)
|
| elif trait == 'honesty':
|
| response = self.add_honesty(response, value)
|
| elif trait == 'resilience':
|
| response = self.add_resilience(response, value)
|
| elif trait == 'open_mindedness':
|
| response = self.add_open_mindedness(response, value)
|
| elif trait == 'empathy':
|
| response = self.add_empathy(response, value)
|
| elif trait == 'reliability':
|
| response = self.add_reliability(response, value)
|
| elif trait == 'humility':
|
| response = self.add_humility(response, value)
|
| elif trait == 'positivity':
|
| response = self.add_positivity(response, value)
|
| elif trait == 'courage':
|
| response = self.add_courage(response, value)
|
| elif trait == 'curiosity':
|
| response = self.add_curiosity(response, value)
|
| elif trait == 'humor':
|
| response = self.add_humor(response, value)
|
| elif trait == 'self_discipline':
|
| response = self.add_self_discipline(response, value)
|
| elif trait == 'emotional_stability':
|
| response = self.add_emotional_stability(response, value)
|
| elif trait == 'assertiveness':
|
| response = self.add_assertiveness(response, value)
|
| elif trait == 'creativity':
|
| response = self.add_creativity(response, value)
|
|
|
| return response
|
|
|
| def add_kindness(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"I understand your concern. {response}"
|
| return response
|
|
|
| def add_honesty(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"To be honest, {response}"
|
| return response
|
|
|
| def add_resilience(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"Let's keep trying. {response}"
|
| return response
|
|
|
| def add_open_mindedness(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"That's an interesting perspective. {response}"
|
| return response
|
|
|
| def add_empathy(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"I can see how you feel. {response}"
|
| return response
|
|
|
| def add_reliability(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"You can count on me. {response}"
|
| return response
|
|
|
| def add_humility(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"I'm still learning. {response}"
|
| return response
|
|
|
| def add_positivity(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"Let's stay positive. {response}"
|
| return response
|
|
|
| def add_courage(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"Let's face this together. {response}"
|
| return response
|
|
|
| def add_curiosity(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"That's fascinating. {response}"
|
| return response
|
|
|
| def add_humor(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"On a lighter note, {response}"
|
| return response
|
|
|
| def add_self_discipline(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"Let's stay focused. {response}"
|
| return response
|
|
|
| def add_emotional_stability(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"Let's stay calm. {response}"
|
| return response
|
|
|
| def add_assertiveness(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"I firmly believe that {response}"
|
| return response
|
|
|
| def add_creativity(self, response, value):
|
|
|
| if value > 0.5:
|
| response = f"Let's think outside the box. {response}"
|
| return response
|
|
|
| def fine_tune_personality(self, trait, value):
|
|
|
| if trait in self.personality_traits:
|
| self.personality_traits[trait] = value
|
| else:
|
| raise ValueError(f"Unknown trait: {trait}")
|
|
|
| def safe_word_format(self, user_input):
|
|
|
| if user_input.lower() == "stop":
|
| self.conversation_history = []
|
| return "Conversation stopped. You can start a new conversation."
|
| elif user_input.lower() == "reset":
|
| self.conversation_history = []
|
| return "Conversation reset. Let's start fresh."
|
| else:
|
| return None
|
|
|
| class TransformerBlock(tf.keras.layers.Layer):
|
| def __init__(self, n_embd, n_head):
|
| super(TransformerBlock, self).__init__()
|
| self.attn = MultiHeadAttention(n_embd, n_head)
|
| self.ln_1 = tf.keras.layers.LayerNormalization(epsilon=1e-5)
|
| self.mlp = tf.keras.Sequential([
|
| tf.keras.layers.Dense(4 * n_embd, activation=gelu),
|
| tf.keras.layers.Dense(n_embd)
|
| ])
|
| self.ln_2 = tf.keras.layers.LayerNormalization(epsilon=1e-5)
|
|
|
| def call(self, x, past=None):
|
| a, present = self.attn(self.ln_1(x), past=past)
|
| x = x + a
|
| m = self.mlp(self.ln_2(x))
|
| x = x + m
|
| return x, present
|
|
|
| class MultiHeadAttention(tf.keras.layers.Layer):
|
| def __init__(self, n_embd, n_head):
|
| super(MultiHeadAttention, self).__init__()
|
| self.n_embd = n_embd
|
| self.n_head = n_head
|
| self.c_attn = tf.keras.layers.Dense(3 * n_embd)
|
| self.c_proj = tf.keras.layers.Dense(n_embd)
|
|
|
| def split_heads(self, x):
|
| return tf.transpose(tf.reshape(x, (*x.shape[:-1], self.n_head, -1)), [0, 2, 1, 3])
|
|
|
| def merge_heads(self, x):
|
| return tf.reshape(tf.transpose(x, [0, 2, 1, 3]), (*x.shape[:-3], -1))
|
|
|
| def call(self, x, past=None):
|
| c = self.c_attn(x)
|
| q, k, v = tf.split(c, 3, axis=-1)
|
| q, k, v = map(self.split_heads, [q, k, v])
|
|
|
| if past is not None:
|
| pk, pv = past
|
| k = tf.concat([pk, k], axis=-2)
|
| v = tf.concat([pv, v], axis=-2)
|
|
|
| present = tf.stack([k, v], axis=1)
|
| a = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(v.shape[-1], tf.float32))
|
| a = tf.nn.softmax(a)
|
| a = tf.matmul(a, v)
|
| a = self.merge_heads(a)
|
| a = self.c_proj(a)
|
| return a, present
|
|
|
| class FAISSRetriever:
|
| def __init__(self, knowledge_base, dim=768, num_results=5):
|
| self.index = faiss.IndexFlatL2(dim)
|
| self.knowledge_base = knowledge_base
|
| self.num_results = num_results
|
|
|
| vectors = [doc['vector'] for doc in knowledge_base]
|
| self.index.add(np.array(vectors))
|
|
|
| def retrieve(self, query_vector):
|
| distances, indices = self.index.search(query_vector.numpy(), self.num_results)
|
| retrieved_docs = [self.knowledge_base[i]['text'] for i in indices[0]]
|
| return tf.constant(retrieved_docs)
|
|
|
| def gelu(x):
|
| return 0.5 * x * (1 + tf.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * tf.pow(x, 3))))
|
|
|
|
|
| def custom_loss(y_true, y_pred, model, task):
|
| if task == 'anomaly_detection':
|
| mse = tf.keras.losses.MeanSquaredError()
|
| return mse(y_true, y_pred)
|
| else:
|
| ce_loss = tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)
|
| reg_loss = tf.reduce_sum([tf.nn.l2_loss(w) for w in model.trainable_weights])
|
| return ce_loss + 0.01 * reg_loss
|
|
|
|
|
| @tf.function
|
| def train_step(model, optimizer, inputs, targets, task):
|
| with tf.GradientTape() as tape:
|
| predictions = model(inputs, task)
|
| loss = custom_loss(targets, predictions, model, task)
|
| gradients = tape.gradient(loss, model.trainable_variables)
|
| optimizer.apply_gradients(zip(gradients, model.trainable_variables))
|
| return loss
|
|
|
|
|
| class HParams:
|
| def __init__(self, n_vocab, n_ctx, n_embd, n_head, n_layer):
|
| self.n_vocab = n_vocab
|
| self.n_ctx = n_ctx
|
| self.n_embd = n_embd
|
| self.n_head = n_head
|
| self.n_layer = n_layer
|
|
|
| hparams = HParams(
|
| n_vocab=50000,
|
| n_ctx=1024,
|
| n_embd=768,
|
| n_head=12,
|
| n_layer=12
|
| )
|
|
|
|
|
| knowledge_base = [
|
| {'text': 'Example knowledge 1', 'vector': np.random.rand(768)},
|
| {'text': 'Example knowledge 2', 'vector': np.random.rand(768)},
|
|
|
| ]
|
|
|
|
|
| model = MultiModalTransformer(hparams, knowledge_base)
|
|
|
|
|
| optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
|
|
|
|
|
| num_epochs = 10
|
| for epoch in range(num_epochs):
|
| for batch in dataset:
|
| inputs, targets, task = batch
|
| loss = train_step(model, optimizer, inputs, targets, task)
|
| print(f"Epoch {epoch + 1}, Loss: {loss.numpy()}")
|
|
|
|
|
| speech_input = tf.random.normal((1, 16000, 1))
|
| speech_output = model(speech_input, task='speech_recognition')
|
|
|
| image_input = tf.random.normal((1, 224, 224, 3))
|
| text_input = tf.random.uniform((1, 10), maxval=50000, dtype=tf.int32)
|
| caption_output = model([image_input, text_input], task='image_captioning')
|
|
|
| music_input = [
|
| tf.random.uniform((1, 100), maxval=128, dtype=tf.int32),
|
| tf.random.uniform((1, 100), maxval=32, dtype=tf.int32),
|
| tf.random.uniform((1, 100), maxval=128, dtype=tf.int32)
|
| ]
|
| music_output = model(music_input, task='music_generation')
|
|
|
| text_input = tf.random.uniform((1, 50), maxval=50000, dtype=tf.int32)
|
| text_output = model(text_input, task='text_generation')
|
|
|
| anomaly_input = tf.random.normal((1, 100, 768))
|
| reconstructed, anomalies = model(anomaly_input, task='anomaly_detection')
|
|
|
|
|
| user_input = "Hello, how are you?"
|
| response = model.conversation(user_input)
|
| print(response)
|
|
|
|
|
| model.fine_tune_personality('kindness', 0.95)
|
|
|
|
|
| user_input = "stop"
|
| response = model.safe_word_format(user_input)
|
| print(response) |