Luciano665's picture
Add more to intructions
21fa12d verified
import os
import gradio as gr
import logging
from git import Repo
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from langchain_pinecone import PineconeVectorStore
from langchain.schema import Document
from tree_sitter_languages import get_parser
from pinecone import Pinecone
import openai
import numpy as np
# Load environment variables
load_dotenv()
# Logging Configuration
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Environment Variables
CLONE_DIR = "./cloned_repos"
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX_KEY = "codebase-app"
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
# Initialize GROQ API
client = openai.OpenAI(
base_url="https://api.groq.com/openai/v1",
api_key=GROQ_API_KEY
)
# Initialize Pinecone
pinecone_client = Pinecone(api_key=PINECONE_API_KEY)
pinecone_index = pinecone_client.Index(PINECONE_INDEX_KEY)
# Initialize SentenceTransformer Embedding Model
embedding_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")
# Supported Extensions
SUPPORTED_EXTENSIONS = {".py", ".java", ".js", ".ts", ".cpp", ".h", ".ipynb"}
IGNORED_DIRS = {"node_modules", "venv", "env", ".git", "__pycache__"}
#Systems prompt
system_prompt =f"""You are a Senior Software engineer with more than 20 years of experience delivering software for massive use. You are very technical and have complete expertise over all domains of software in all aspects.
Answer any questions I have about the codebase, based on the code provided. Always consider all of the context provided when forming a response.
"""
# Backend Logic: Clone Repository
def clone_repository(repo_url: str) -> str:
"""Clone the GitHub repository locally."""
repo_name = repo_url.split("/")[-1].replace(".git", "")
repo_path = os.path.join(CLONE_DIR, repo_name)
if not os.path.exists(CLONE_DIR):
os.makedirs(CLONE_DIR)
if os.path.exists(repo_path):
logger.info(f"Repository already exists: {repo_path}")
return repo_path
Repo.clone_from(repo_url, repo_path)
logger.info(f"Cloned repository to: {repo_path}")
return repo_path
# Backend Logic: Parse Repository
class SimpleTreeSitterParser:
"""Parser for extracting code chunks from files."""
def __init__(self, language: str):
self.language = language
try:
self.parser = get_parser(language) # Ensure only the required argument is passed
except Exception as e:
logger.error(f"Error initializing parser for {language}: {e}")
raise ValueError(f"Parser error for {language}: {e}")
def parse(self, code: str) -> list:
try:
tree = self.parser.parse(bytes(code, "utf-8"))
root = tree.root_node
chunks = []
for child in root.children:
chunks.append({
"type": child.type,
"content": code[child.start_byte:child.end_byte],
"start_line": child.start_point[0] + 1,
"end_line": child.end_point[0] + 1,
})
return chunks
except Exception as e:
logger.error(f"Error parsing code: {e}")
return []
def parse_repository(repo_path: str) -> list:
"""Parse repository files into meaningful chunks."""
chunks = []
for root, _, files in os.walk(repo_path):
if any(ignored_dir in root for ignored_dir in IGNORED_DIRS):
continue
for file in files:
ext = os.path.splitext(file)[1]
if ext not in SUPPORTED_EXTENSIONS:
logger.warning(f"Skipping unsupported file: {file}")
continue
file_path = os.path.join(root, file)
language = {
".py": "python",
".ts": "typescript",
".js": "javascript",
".java": "java",
".cpp": "cpp",
}.get(ext, "unknown")
try:
logger.info(f"Processing file: {file_path}")
code = get_file_content(file_path)
if not code:
logger.warning(f"No content found in {file_path}")
continue
parser = SimpleTreeSitterParser(language)
parsed_chunks = parser.parse(code)
chunks.extend(parsed_chunks)
except ValueError as ve:
logger.error(f"Skipping file {file_path} due to parser error: {ve}")
except Exception as e:
logger.error(f"Unexpected error processing {file_path}: {e}")
return chunks
# Helper: Read File Content
def get_file_content(file_path: str) -> str:
"""Read and return the content of a file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
return ""
# Backend Logic: Store Embeddings
def store_embeddings(documents, namespace="default"):
"""Store embeddings in Pinecone."""
try:
texts = [doc.page_content for doc in documents]
embeddings = embedding_model.encode(texts, show_progress_bar=True)
vectors = [
{
"id": str(i),
"values": embeddings[i].tolist(),
"metadata": {"text": doc.page_content, **doc.metadata},
}
for i, doc in enumerate(documents)
]
pinecone_index.upsert(vectors=vectors, namespace=namespace)
logger.info(f"Stored {len(vectors)} embeddings in Pinecone namespace '{namespace}'.")
except Exception as e:
logger.error(f"Error storing embeddings: {e}")
raise
# Backend Logic: Perform RAG
def perform_rag(query: str, namespace="default") -> str:
"""Retrieve context and generate responses."""
try:
query_embedding = embedding_model.encode(query).tolist()
response = pinecone_index.query(
vector=query_embedding,
top_k=10,
include_metadata=True,
namespace=namespace
)
if not response.get('matches'):
return "No relevant context found."
contexts = [match['metadata'].get('text', '') for match in response['matches']]
augmented_query = "<CONTEXT>\n" + "\n\n-------\n\n".join(contexts) + "\n-------\n</CONTEXT>\n\n" + query
llm_response = client.chat.completions.create(
model="llama-3.1-8b-instant",
messages=[
{"role": "system", "content":system_prompt},
{"role": "user", "content": augmented_query}
]
)
return llm_response.choices[0].message.content
except Exception as e:
logger.error(f"Error performing RAG: {e}")
return f"Error: {e}"
# Process Repository
def process_repo(repo_url: str) -> str:
"""Clone, parse, and store embeddings for a repository."""
try:
namespace = repo_url.split("/")[-1].replace(".git", "")
repo_path = clone_repository(repo_url)
chunks = parse_repository(repo_path)
if not chunks:
return "No valid chunks found in the repository."
documents = [Document(page_content=chunk["content"], metadata={"repo_url": repo_url}) for chunk in chunks]
store_embeddings(documents, namespace=namespace)
return f"Repository processed successfully in namespace '{namespace}'!"
except Exception as e:
logger.error(f"Error processing repository: {e}")
return f"Error: {e}"
# Fetch Namespaces
def fetch_namespaces():
"""Retrieve namespaces from Pinecone."""
try:
stats = pinecone_index.describe_index_stats()
return list(stats.get("namespaces", {}).keys())
except Exception as e:
logger.error(f"Error fetching namespaces: {e}")
return []
# Gradio UI
def create_ui():
namespaces = fetch_namespaces()
with gr.Blocks() as demo:
namespace_state = gr.State(value=None)
chat_history = gr.State(value=[])
with gr.Column():
gr.Markdown("## Codebase Chat App with Repository Management")
gr.Markdown("""
**Instructions:**
1. Enter the GitHub repository URL you wish to clone and click **Git Clone 😺**.
2. After cloning, to see the new repository appear in the namespace dropdown, type any character into the URL box and click **Git Clone 😺** again.
3. Select the desired namespace from the dropdown.
4. Use the chatbot below to interact with the selected codebase.
(Sorry for this I'm currently trying to solve this bug, feel free to se the code if you can spot the issue 🙂‍↕️)
""")
with gr.Row():
repo_url_input = gr.Textbox(label="GitHub Repository URL", placeholder="Enter repo URL to clone")
clone_button = gr.Button("Git Clone 😺")
clone_status = gr.Textbox(label="Clone Status", interactive=False)
namespace_dropdown = gr.Dropdown(choices=namespaces, label="Namespace", interactive=True)
chatbot = gr.Chatbot(label="Codebase Chatbot", type="messages")
message_input = gr.Textbox(placeholder="Enter your message here...")
send_button = gr.Button("Send")
def update_namespace_or_clone(repo_url, current_namespace):
"""Clone repository and update namespaces."""
if repo_url:
message = process_repo(repo_url)
updated_namespaces = fetch_namespaces()
return (
gr.update(choices=updated_namespaces, value=None),
message,
[], # Clear chat history
None
)
return gr.update(), "Please provide a repository URL.", current_namespace, current_namespace
def handle_query(message, history, namespace):
"""Handle chatbot queries."""
if not namespace:
new_history = history + [{"role": "assistant", "content": "Please select a namespace first!"}]
return new_history, new_history, gr.update(value="")
response = perform_rag(message, namespace)
# Convert history to the correct format
formatted_history = history + [
{"role": "user", "content": message},
{"role": "assistant", "content": response}
]
return formatted_history, formatted_history, gr.update(value="")
# Bind clone button
clone_button.click(
update_namespace_or_clone,
inputs=[repo_url_input, namespace_state],
outputs=[namespace_dropdown, clone_status, chat_history, namespace_state],
)
# Bind query button
send_button.click(
handle_query,
inputs=[message_input, chat_history, namespace_dropdown],
outputs=[chatbot, chat_history, message_input],
)
return demo
if __name__ == "__main__":
app = create_ui()
app.launch()