| import os |
| import datetime |
| import uuid |
| import time |
| import threading |
| import traceback |
| import logging |
| from queue import Queue |
| from dotenv import load_dotenv |
| import json |
|
|
| |
| load_dotenv() |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
| POSTGRES_DSN = os.getenv("POSTGRES_DSN", "postgresql://user:password@localhost:5432/agentdb") |
| REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") |
| BASE_MODEL_NAME = os.getenv("BASE_MODEL_NAME", "gpt-4o-mini") |
| |
| |
| LEARNING_INTERVAL_HOURS = int(os.getenv("LEARNING_INTERVAL_HOURS", "6")) |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
| |
| |
| |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings |
| from langchain.agents import AgentExecutor, create_react_agent, Tool |
| |
|
|
| |
| from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, DateTime, Text, MetaData, Index |
| from sqlalchemy.dialects.postgresql import UUID, JSONB |
| |
| |
| from sqlalchemy.orm import sessionmaker, declarative_base |
| import sqlalchemy |
|
|
| |
| import redis |
|
|
| |
| from sentence_transformers import SentenceTransformer |
|
|
| |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from apscheduler.triggers.interval import IntervalTrigger |
|
|
| |
| import torch |
| from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig |
| from peft import LoraConfig |
| from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead, create_reference_model |
| from trl.core import LengthSampler |
|
|
| |
| Base = declarative_base() |
| engine = create_engine(POSTGRES_DSN) |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
|
|
| |
| class Experience(Base): |
| __tablename__ = "experiences" |
| id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) |
| timestamp = Column(DateTime, default=datetime.datetime.utcnow) |
| goal = Column(Text) |
| task = Column(Text) |
| |
| action_info = Column(JSONB) |
| observation_summary = Column(Text) |
| success = Column(Boolean) |
| feedback_score = Column(Float, default=0.0) |
| execution_time = Column(Float) |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| class Task(Base): |
| __tablename__ = "tasks" |
| id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) |
| goal = Column(Text) |
| task_description = Column(Text) |
| status = Column(String, default="pending") |
| created_at = Column(DateTime, default=datetime.datetime.utcnow) |
| updated_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow) |
| result = Column(Text, nullable=True) |
|
|
| |
| Base.metadata.create_all(bind=engine) |
|
|
| |
| redis_client = redis.from_url(REDIS_URL, decode_responses=True) |
| TASK_QUEUE_KEY = "agent_task_queue" |
|
|
| |
| |
| |
| |
| embedding_model_name = 'all-MiniLM-L6-v2' |
| logging.info(f"Loading sentence transformer model: {embedding_model_name}...") |
| |
| sentence_model = SentenceTransformer(embedding_model_name, device='cpu') |
| logging.info("Sentence transformer model loaded.") |
|
|
| def get_vector(text: str): |
| """Generates a vector embedding for the given text.""" |
| if not text: |
| return None |
| |
| |
| vector = sentence_model.encode(text, convert_to_numpy=True) |
| |
| |
| return vector.tolist() |
|
|
| |
| def add_experience_db(task_info: dict, agent_output: dict, success: bool, feedback: float = 0.0, exec_time: float = 0.0): |
| """Adds an agent's experience to the PostgreSQL database.""" |
| db = SessionLocal() |
| try: |
| |
| task_vector = get_vector(task_info.get("task")) |
| obs_summary = agent_output.get("output", "")[:500] |
| observation_vector = get_vector(obs_summary) |
| |
| state_vector = None |
| if task_vector and observation_vector: |
| |
| |
| pass |
|
|
| action_info = { |
| "action": agent_output.get("action", "unknown"), |
| "input": agent_output.get("action_input", "unknown"), |
| |
| } |
|
|
| exp = Experience( |
| goal=task_info.get("goal"), |
| task=task_info.get("task"), |
| action_info=action_info, |
| observation_summary=obs_summary, |
| success=success, |
| feedback_score=feedback, |
| execution_time=exec_time, |
| |
| |
| |
| ) |
| db.add(exp) |
| db.commit() |
| logging.debug(f"Experience added to DB: Success={success}, Task={task_info.get('task')[:50]}") |
| except Exception as e: |
| db.rollback() |
| logging.error(f"Failed to add experience to DB: {e}", exc_info=True) |
| finally: |
| db.close() |
|
|
| def retrieve_relevant_experiences_db(query: str, k: int = 3) -> list[Experience]: |
| """Retrieves relevant experiences using vector similarity search (requires pgvector).""" |
| db = SessionLocal() |
| try: |
| query_vector = get_vector(query) |
| if query_vector is None: |
| return [] |
|
|
| |
| |
| |
| |
| |
|
|
| |
| logging.warning("Vector search in DB requested but not implemented (requires pgvector). Returning empty list.") |
| return [] |
| except Exception as e: |
| logging.error(f"Failed to retrieve experiences from DB: {e}", exc_info=True) |
| return [] |
| finally: |
| db.close() |
|
|
| |
| |
| tools = [ |
| Tool(name="Search", func=search.run, description="..."), |
| Tool(name="PythonREPL", func=python_repl.run, description="..."), |
| ] |
|
|
| |
| |
| agent_llm = ChatOpenAI(model=BASE_MODEL_NAME, temperature=0.3, api_key=OPENAI_API_KEY) |
| prompt_template = hub.pull("hwchase17/react-chat") |
| agent = create_react_agent(agent_llm, tools, prompt_template) |
| agent_executor = AgentExecutor( |
| agent=agent, tools=tools, verbose=False, handle_parsing_errors=True, max_iterations=10, |
| ) |
|
|
|
|
| |
| learning_lock = threading.Lock() |
| ppo_trainer = None |
| fine_tuned_model_path = "./fine_tuned_model" |
|
|
| def calculate_reward(experience_data: dict) -> float: |
| """Calculates a reward score based on experience.""" |
| reward = 0.0 |
| if experience_data.get("success"): |
| reward += 1.0 |
| else: |
| reward -= 1.0 |
|
|
| |
| exec_time = experience_data.get("execution_time", 1.0) |
| if exec_time > 1.0: |
| reward -= 0.1 * min(max(0, exec_time), 300)**0.5 |
|
|
| |
| reward += experience_data.get("feedback_score", 0.0) * 0.5 |
|
|
| return reward |
|
|
| def prepare_ppo_data(experiences: list[Experience]) -> list[dict]: |
| """Prepares data in the format expected by TRL's PPOTrainer.""" |
| ppo_data = [] |
| for exp in experiences: |
| |
| query_text = f"Goal: {exp.goal}\nTask: {exp.task}" |
| |
| response_text = exp.observation_summary |
| |
| reward_score = calculate_reward(exp.metadata) |
|
|
| if query_text and response_text: |
| ppo_data.append({ |
| "query": query_text, |
| "response": response_text, |
| "reward": torch.tensor([reward_score], dtype=torch.float3_tensors) |
| }) |
| return ppo_data |
|
|
|
|
| def run_learning_cycle(): |
| """The main learning process using TRL.""" |
| global ppo_trainer |
| if not torch.cuda.is_available(): |
| logging.warning("CUDA not available. Skipping fine-tuning cycle.") |
| return |
|
|
| with learning_lock: |
| logging.info(f"[Learning Cycle Triggered] - Device: {DEVICE}") |
| start_time = time.time() |
|
|
| |
| logging.info("Fetching recent experiences from PostgreSQL...") |
| db = SessionLocal() |
| try: |
| |
| recent_experiences = db.query(Experience).order_by(Experience.timestamp.desc()).limit(500).all() |
| finally: |
| db.close() |
|
|
| if not recent_experiences or len(recent_experiences) < 50: |
| logging.info(f"Not enough new experiences ({len(recent_experiences)}). Skipping fine-tuning.") |
| return |
| logging.info(f"Fetched {len(recent_experiences)} experiences for learning.") |
|
|
| |
| logging.info("Preparing data for PPO...") |
| ppo_data = prepare_ppo_data(recent_experiences) |
| if not ppo_data: |
| logging.warning("No valid data points after preparation. Skipping fine-tuning.") |
| return |
|
|
| |
| |
| |
| |
| |
|
|
| |
| logging.info("Setting up TRL PPOTrainer...") |
| try: |
| |
| ppo_config = PPOConfig( |
| model_name=BASE_MODEL_NAME, |
| learning_rate=1.41e-5, |
| batch_size=16, |
| mini_batch_size=4, |
| gradient_accumulation_steps=1, |
| optimize_cuda_cache=True, |
| |
| |
| ppo_epochs=4, |
| seed=42, |
| |
| use_lora=True, |
| ) |
|
|
| |
| |
| lora_config = LoraConfig( |
| r=16, lora_alpha=32, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" |
| ) |
| tokenizer = AutoTokenizer.from_pretrained(ppo_config.model_name) |
| if getattr(tokenizer, "pad_token", None) is None: |
| tokenizer.pad_token = tokenizer.eos_token |
|
|
| |
| model = AutoModelForCausalLMWithValueHead.from_pretrained( |
| ppo_config.model_name, |
| |
| peft_config=lora_config, |
| |
| torch_dtype=torch.float16, |
| device_map="auto" |
| ) |
| |
| ref_model = create_reference_model(model) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| logging.info("Starting PPO Training Loop (Simulation - Actual requires dataset)...") |
| |
| |
| |
| |
| |
| |
| |
| time.sleep(10) |
|
|
| |
| logging.info("Saving fine-tuned LoRA adapters...") |
| |
| |
| logging.info(f"Fine-tuned adapters saved to {fine_tuned_model_path}") |
|
|
| except Exception as e: |
| logging.error(f"Error during TRL setup or training: {e}", exc_info=True) |
| |
| del model, ref_model, ppo_trainer |
| torch.cuda.empty_cache() |
|
|
| logging.info(f"Learning cycle finished. Duration: {time.time() - start_time:.2f}s") |
|
|
| |
| def add_task_mq(task: str, goal: str): |
| """Adds a task to the Redis queue.""" |
| task_id = str(uuid.uuid4()) |
| task_data = json.dumps({"id": task_id, "task": task, "goal": goal}) |
| try: |
| redis_client.lpush(TASK_QUEUE_KEY, task_data) |
| logging.info(f"Task {task_id} added to Redis queue: {task[:50]}...") |
| except Exception as e: |
| logging.error(f"Failed to add task to Redis: {e}") |
|
|
| |
| def agent_worker(worker_id: int): |
| """Processes tasks from the Redis queue.""" |
| logging.info(f"Agent Worker-{worker_id} started.") |
| while True: |
| try: |
| |
| _, task_data_json = redis_client.brpop(TASK_QUEUE_KEY) |
| task_info = json.loads(task_data_json) |
| task_id = task_info["id"] |
| task_desc = task_info["task"] |
| goal = task_info["goal"] |
|
|
| logging.info(f"Worker-{worker_id} processing Task {task_id}: {task_desc[:50]}...") |
| start_time = time.time() |
| success = False |
| final_output = None |
| agent_result = {} |
|
|
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| input_messages = [ |
| SystemMessage(content=f"Your long term goal is: {goal}. Think step-by-step."), |
| |
| HumanMessage(content=f"Current task: {task_desc}") |
| ] |
|
|
| |
| try: |
| |
| |
| agent_result = agent_executor.invoke({"input": input_messages}) |
| final_output = agent_result.get("output", "No output.") |
| |
| success = not any(err in final_output.lower() for err in ["error", "fail", "unable"]) |
| except Exception as e: |
| logging.error(f"Worker-{worker_id} Task {task_id} failed during execution: {e}", exc_info=True) |
| final_output = f"Agent execution failed: {e}" |
| success = False |
| agent_result = {"output": final_output, "action": "error"} |
|
|
| |
| exec_time = time.time() - start_time |
| |
| feedback_score = 0.0 |
| add_experience_db(task_info, agent_result, success, feedback_score, exec_time) |
|
|
| |
| |
|
|
| logging.info(f"Worker-{worker_id} finished Task {task_id}. Success: {success}. Time: {exec_time:.2f}s") |
|
|
| except redis.exceptions.ConnectionError as e: |
| logging.error(f"Worker-{worker_id} Redis connection error: {e}. Retrying in 10s...") |
| time.sleep(10) |
| except Exception as e: |
| logging.error(f"Worker-{worker_id} encountered an unexpected error: {e}", exc_info=True) |
| time.sleep(5) |
|
|
| |
| if __name__ == "__main__": |
| logging.info("Initializing Agent System...") |
|
|
| |
| scheduler = BackgroundScheduler(daemon=True) |
| scheduler.add_job( |
| run_learning_cycle, |
| trigger=IntervalTrigger(hours=LEARNING_INTERVAL_HOURS), |
| id="learning_job", |
| name="Fine-tuning Learning Cycle", |
| replace_existing=True |
| ) |
| scheduler.start() |
| logging.info(f"Background learning scheduler started. Interval: {LEARNING_INTERVAL_HOURS} hours.") |
|
|
| |
| num_workers = int(os.getenv("NUM_WORKERS", "2")) |
| worker_threads = [] |
| for i in range(num_workers): |
| thread = threading.Thread(target=agent_worker, args=(i+1,), daemon=True) |
| thread.start() |
| worker_threads.append(thread) |
| logging.info(f"{num_workers} Agent worker threads started.") |
|
|
| |
| add_task_mq("Explain the difference between LoRA and full fine-tuning for LLMs.", |
| "Understand AI model optimization techniques.") |
| add_task_mq("Write a Python script using pandas to read a CSV file named 'data.csv' and print the first 5 rows.", |
| "Develop data processing scripts.") |
|
|
| logging.info("Agent system is running. Workers processing tasks from Redis.") |
| logging.info("Press Ctrl+C to stop.") |
|
|
| try: |
| |
| while True: |
| time.sleep(60) |
| |
| logging.debug("Main thread alive...") |
| except KeyboardInterrupt: |
| logging.info("Shutdown signal received...") |
| scheduler.shutdown() |
| |
| |
| logging.info("Agent system stopped.") |