| import json |
| import numpy as np |
| import tensorflow as tf |
| from tensorflow.keras import layers |
| import sentencepiece as spm |
| import requests |
| from flask import Flask, request, Response, session, jsonify |
| from bs4 import BeautifulSoup |
| from huggingface_hub import hf_hub_download |
| import uuid |
| import os |
| import time |
| from collections import Counter |
|
|
| app = Flask(__name__, static_folder="static") |
| app.secret_key = os.urandom(32) |
|
|
| |
| |
| |
| os.environ["HF_HOME"] = "/tmp/hf_cache" |
| hf_token = os.getenv("HF_TOKEN") |
|
|
| CHAT_MODEL_PATH = hf_hub_download( |
| repo_id="AlphaCNN/LamCo", |
| filename="LamCo.weights.h5", |
| repo_type="model", |
| token=hf_token |
| ) |
| CHAT_TOKENIZER_PATH = hf_hub_download( |
| repo_id="AlphaCNN/LamCo", |
| filename="ko_unigram.model", |
| repo_type="model", |
| token=hf_token |
| ) |
|
|
| print(CHAT_MODEL_PATH) |
| sp = spm.SentencePieceProcessor() |
| sp.load(CHAT_TOKENIZER_PATH) |
| pad_id = sp.piece_to_id("<pad>") or 0 |
| start_id = sp.piece_to_id("<start>") or 1 |
| end_id = sp.piece_to_id("<end>") or 2 |
| unk_id = sp.piece_to_id("<unk>") or 3 |
| sep_id = sp.piece_to_id("<sep>") |
| vocab_size = sp.get_piece_size() |
| max_len = 125 |
|
|
| def text_to_ids(text): |
| return sp.encode(text, out_type=int) |
|
|
| def ids_to_text(ids): |
| return sp.decode(ids) |
|
|
| class SwiGLU(layers.Layer): |
| def __init__(self, d_model, f_d=8/3): |
| super().__init__() |
| hidden_dim = int(d_model * f_d) |
| self.proj = layers.Dense(hidden_dim * 2, use_bias=False, dtype='float32') |
| self.out = layers.Dense(d_model, use_bias=False, dtype='float32') |
|
|
| def call(self, x): |
| x_val, x_gate = tf.split(self.proj(x), 2, axis=-1) |
| return self.out(x_val * tf.nn.silu(x_gate)) |
|
|
| class DilatedConvLayer(layers.Layer): |
| def __init__(self, d_model, dilation_rate, dropout_rate=0.1): |
| super().__init__() |
| self.conv = layers.Conv1D( |
| filters=d_model, |
| kernel_size=3, |
| dilation_rate=dilation_rate, |
| padding='causal', |
| use_bias=True, |
| kernel_initializer='he_normal', |
| dtype='float32' |
| ) |
| self.ln = layers.LayerNormalization(epsilon=1e-5, dtype='float32') |
| self.dropout = layers.Dropout(dropout_rate) |
|
|
| def call(self, x, training=False): |
| residual = x |
| x = self.conv(x) |
| x = self.ln(x + residual) |
| x = self.dropout(x, training=training) |
| return x |
|
|
| class Lamko(tf.keras.Model): |
| def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1): |
| super().__init__() |
| self.token_embedding = layers.Embedding(vocab_size, d_model, dtype='float32') |
| self.pos_embedding = layers.Embedding(max_seq_len, d_model, dtype='float32') |
|
|
| self.blocks = [] |
| for i in range(n_layers): |
| self.blocks.append(DilatedConvLayer(d_model, 2 ** i, dropout_rate)) |
| if (i + 1) % 3 == 0: |
| self.blocks.append(SwiGLU(d_model)) |
| self.blocks.append(layers.LayerNormalization(epsilon=1e-5, dtype='float32')) |
|
|
| self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype='float32') |
|
|
| def call(self, x, training=False): |
| batch_size, seq_len = tf.shape(x)[0], tf.shape(x)[1] |
| positions = tf.range(seq_len)[tf.newaxis, :] |
| positions = tf.clip_by_value(positions, 0, self.pos_embedding.input_dim - 1) |
|
|
| x = self.token_embedding(x) + self.pos_embedding(positions) |
|
|
| for block in self.blocks: |
| if isinstance(block, SwiGLU): |
| x = x + block(x) |
| else: |
| x = block(x, training=training) if hasattr(block, 'training') else block(x) |
|
|
| x = self.ln_f(x) |
| logits = tf.matmul(x, self.token_embedding.weights[0], transpose_b=True) |
| return logits |
|
|
| model = Lamko(vocab_size=vocab_size, max_seq_len=max_len, d_model=384, n_layers=9) |
| dummy_input = tf.zeros((1, max_len), dtype=tf.int32) |
| _ = model(dummy_input) |
| model.load_weights(CHAT_MODEL_PATH) |
| print("λͺ¨λΈ κ°μ€μΉ λ‘λ μλ£!") |
|
|
| @tf.function(input_signature=[ |
| tf.TensorSpec(shape=(1, None), dtype=tf.int32), |
| tf.TensorSpec(shape=(vocab_size,), dtype=tf.int32), |
| tf.TensorSpec(shape=(), dtype=tf.int32), |
| tf.TensorSpec(shape=(), dtype=tf.float32), |
| tf.TensorSpec(shape=(), dtype=tf.float32), |
| tf.TensorSpec(shape=(), dtype=tf.float32), |
| tf.TensorSpec(shape=(), dtype=tf.int32), |
| tf.TensorSpec(shape=(), dtype=tf.int32), |
| tf.TensorSpec(shape=(), dtype=tf.int32), |
| ]) |
| def generate_step(input_ids, token_counts, current_length, temperature, repetition_penalty, top_p, top_k, min_len, step): |
| pad_len = max_len - tf.shape(input_ids)[1] |
| input_padded = tf.pad(input_ids, [[0,0],[0,pad_len]], constant_values=pad_id) |
| logits = model(input_padded, training=False) |
| next_logits = logits[0, current_length - 1] |
|
|
| penalty = tf.pow(repetition_penalty, tf.cast(token_counts, tf.float32)) |
| next_logits = next_logits / penalty |
|
|
| |
| if current_length < min_len: |
| next_logits = tf.tensor_scatter_nd_update(next_logits, [[end_id]], [-1e9]) |
| next_logits = tf.tensor_scatter_nd_update(next_logits, [[pad_id]], [-1e9]) |
|
|
| |
| if top_k > 0: |
| kth_val = tf.math.top_k(next_logits, k=top_k).values[-1] |
| mask = next_logits < kth_val |
| next_logits = tf.where(mask, -1e9, next_logits) |
|
|
| |
| next_logits = next_logits / temperature |
| probs = tf.nn.softmax(next_logits) |
| sorted_probs, sorted_idx = tf.math.top_k(probs, k=vocab_size) |
| cum_probs = tf.cumsum(sorted_probs) |
| cutoff_mask = cum_probs <= top_p |
| cutoff_idx = tf.reduce_sum(tf.cast(cutoff_mask, tf.int32)) + 1 |
| cutoff_idx = tf.minimum(cutoff_idx, vocab_size) |
| filtered_idx = sorted_idx[:cutoff_idx] |
| filtered_probs = sorted_probs[:cutoff_idx] |
| filtered_probs = filtered_probs / tf.reduce_sum(filtered_probs) |
|
|
| |
| rand_val = tf.random.uniform([], 0, 1) |
| def sample(): |
| sampled_id = tf.random.categorical(tf.math.log([filtered_probs]), 1)[0,0] |
| return filtered_idx[sampled_id] |
| def argmax(): |
| return filtered_idx[tf.argmax(filtered_probs)] |
| sampled_id = tf.cond(rand_val < 0.5536, argmax, sample) |
| sampled_id = tf.cast(sampled_id, tf.int32) |
|
|
| |
| token_counts = tf.tensor_scatter_nd_add(token_counts, [[sampled_id]], [1]) |
| return sampled_id, token_counts |
|
|
|
|
| |
| |
| |
| def generate_text_streaming(model, prompt, max_len=115, max_gen=100, |
| temperature=0.75, min_len=20, |
| repetition_penalty=1.2, top_p=0.9, top_k=50): |
| model_input = text_to_ids(f"<start> {prompt} <sep>") |
| model_input = model_input[:max_len] |
| generated = list(model_input) |
| start_output_idx = len(model_input) |
|
|
| |
| token_counts_np = np.zeros(vocab_size, dtype=np.int32) |
| for t in generated: |
| token_counts_np[t] += 1 |
| token_counts = tf.Variable(token_counts_np, dtype=tf.int32) |
|
|
| prev_decoded = "" |
|
|
| for step in range(max_gen): |
| input_tensor = tf.expand_dims(generated, axis=0) |
|
|
| sampled_id, token_counts = generate_step( |
| input_tensor, |
| token_counts, |
| tf.constant(len(generated), dtype=tf.int32), |
| tf.constant(temperature, dtype=tf.float32), |
| tf.constant(repetition_penalty, dtype=tf.float32), |
| tf.constant(top_p, dtype=tf.float32), |
| tf.constant(top_k, dtype=tf.int32), |
| tf.constant(min_len, dtype=tf.int32), |
| tf.constant(step, dtype=tf.int32) |
| ) |
|
|
| sampled_id = int(sampled_id.numpy()) |
| generated.append(sampled_id) |
|
|
| |
| if len(generated) > start_output_idx: |
| decoded_full = sp.decode(generated[start_output_idx:]) |
| decoded_full = decoded_full.replace("β", " ").strip() |
| for t in ["<start>", "<sep>", "<end>"]: |
| decoded_full = decoded_full.replace(t, "") |
| decoded_full = decoded_full.lstrip(",!?.λμ ") |
|
|
| new_output = decoded_full[len(prev_decoded):] |
| if new_output: |
| yield new_output |
| prev_decoded = decoded_full |
|
|
| |
| if len(generated) >= min_len and (sampled_id == end_id or decoded_full.endswith(('.', '!', '?'))): |
| break |
|
|
| token_map = { |
| "νμ΄": "μλ
νμΈμ!", |
| "γ
γ
": "μλ
νμΈμ!", |
| "νμ΄~": "μλ
νμΈμ!", |
| "μλ
": "μλ
νμΈμ!", |
| "μλ
!": "μλ
νμΈμ!", |
| "μκ°": "μκ°. λμ€μ 보μ", |
| "μ κ°": "μ κ°. λμ€μ 보μ" |
| } |
|
|
| def preprocess_text(text): |
| for key, val in token_map.items(): |
| text = text.replace(key, val) |
| return text |
|
|
| |
| @app.route('/') |
| def index(): |
| return app.send_static_file('index.html') |
|
|
| @app.route('/api/search') |
| def search_api(): |
| query = request.args.get("query", "").strip() |
| if not query: |
| return jsonify({"results": []}) |
|
|
| search_url = f"https://ko.wikipedia.org/w/index.php?search={query}" |
| headers = {"User-Agent": "Mozilla/5.0"} |
| resp = requests.get(search_url, headers=headers) |
| soup = BeautifulSoup(resp.text, "html.parser") |
|
|
| results = [] |
|
|
| |
| search_items = soup.select(".mw-search-result-heading a") |
| if search_items: |
| for item in search_items[:5]: |
| title = item.text |
| link = "https://ko.wikipedia.org" + item.get("href") |
| snippet_tag = item.find_parent().find("div", class_="searchresult") |
| snippet = snippet_tag.text.strip() if snippet_tag else "" |
| results.append({"title": title, "link": link, "snippet": snippet}) |
|
|
| |
| elif soup.select("#firstHeading"): |
| title = soup.select_one("#firstHeading").text.strip() |
| link = resp.url |
| |
| content_paragraph = soup.select_one(".mw-parser-output > p") |
| snippet = content_paragraph.text.strip() if content_paragraph else "" |
| results.append({"title": title, "link": link, "snippet": snippet}) |
|
|
| return jsonify({"results": results}) |
|
|
| @app.before_request |
| def ensure_user_id(): |
| if 'user_id' not in session: |
| session['user_id'] = str(uuid.uuid4()) |
|
|
| @app.route('/api/chat', methods=['GET','POST']) |
| def chat_api(): |
| user_msg = (request.json.get("message") if request.method=="POST" else request.args.get("message") or "").strip() |
| if not user_msg: |
| return Response((f'data: {{"error":"λ©μμ§λ₯Ό μ
λ ₯ν΄μ£ΌμΈμ."}}\n\n' for _ in range(1)), |
| mimetype='text/event-stream') |
| user_id = session['user_id'] |
|
|
| user_msg = preprocess_text(user_msg) |
|
|
| def gen(): |
| try: |
| |
| search_result = "" |
| for token in generate_text_streaming( |
| model, user_msg, |
| max_len=max_len, |
| max_gen=115, |
| temperature=0.8, |
| min_len=10, |
| repetition_penalty=1.1, |
| top_p=0.9, |
| top_k=5 |
| ): |
| safe_token = json.dumps(token) |
| yield f'data: {{"char":{safe_token}}}\n\n' |
|
|
| yield 'data: {"done":true}\n\n' |
| except Exception as e: |
| yield f'data: {{"error":{json.dumps(str(e))}}}\n\n' |
|
|
| return Response(gen(), mimetype='text/event-stream') |
|
|
| if __name__=="__main__": |
| app.run(host="0.0.0.0", port=7860) |