| import gradio as gr |
| import time |
| import threading |
| import logging |
| from gradio.themes.utils import sizes |
| from main import run_repository_ranking |
| import agent |
|
|
| |
| |
| |
| LOG_BUFFER = [] |
| LOG_BUFFER_LOCK = threading.Lock() |
|
|
| class BufferLogHandler(logging.Handler): |
| def emit(self, record): |
| log_entry = self.format(record) |
| with LOG_BUFFER_LOCK: |
| LOG_BUFFER.append(log_entry) |
|
|
| root_logger = logging.getLogger() |
| if not any(isinstance(h, BufferLogHandler) for h in root_logger.handlers): |
| handler = BufferLogHandler() |
| formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") |
| handler.setFormatter(formatter) |
| root_logger.addHandler(handler) |
|
|
| def filter_logs(logs): |
| filtered = [] |
| last_was_fetching = False |
| for log in logs: |
| if "HTTP Request:" in log: |
| if not last_was_fetching: |
| filtered.append("Fetching repositories...") |
| last_was_fetching = True |
| else: |
| filtered.append(log) |
| last_was_fetching = False |
| return filtered |
|
|
| def parse_result_to_html(raw_result: str, num_results: int) -> (str, list): |
| """ |
| Parses the raw string output from run_repository_ranking to an HTML table. |
| Only the top N results are displayed. |
| Returns (html, repo_names) |
| """ |
| entries = raw_result.strip().split("Final Rank:") |
| entries = entries[1:num_results+1] |
| if not entries: |
| return ("<p>No repositories found for your query.</p>", []) |
| html = """ |
| <table border="1" style="width:80%; margin: auto; border-collapse: collapse;"> |
| <thead> |
| <tr> |
| <th>Rank</th> |
| <th>Title</th> |
| <th>Link</th> |
| <th>Combined Score</th> |
| </tr> |
| </thead> |
| <tbody> |
| """ |
| repo_names = [] |
| for entry in entries: |
| lines = entry.strip().split("\n") |
| data = {} |
| data["Final Rank"] = lines[0].strip() if lines else "" |
| for line in lines[1:]: |
| if ": " in line: |
| key, val = line.split(": ", 1) |
| data[key.strip()] = val.strip() |
| |
| link = data.get('Link', '') |
| repo_name = '' |
| if 'github.com/' in link: |
| repo_name = link.split('github.com/')[-1].strip('/ ') |
| if repo_name: |
| repo_names.append(repo_name) |
| html += f""" |
| <tr> |
| <td>{data.get('Final Rank', '')}</td> |
| <td>{data.get('Title', '')}</td> |
| <td><a href=\"{data.get('Link', '#')}\" target=\"_blank\">GitHub</a></td> |
| <td>{data.get('Combined Score', '')}</td> |
| </tr> |
| """ |
| html += "</tbody></table>" |
| return html, repo_names |
|
|
| |
| |
| |
| def gpu_run_repo(topic: str, num_results: int): |
| return run_repository_ranking(topic, num_results) |
|
|
| def run_lite_workflow(topic, num_results, result_container): |
| result = gpu_run_repo(topic, num_results) |
| result_container["raw_result"] = result |
|
|
| def stream_lite_workflow(topic, num_results): |
| logging.info("[UI] User started a new search for topic: %s", topic) |
| with LOG_BUFFER_LOCK: |
| LOG_BUFFER.clear() |
| result_container = {} |
| workflow_thread = threading.Thread(target=run_lite_workflow, args=(topic, num_results, result_container)) |
| workflow_thread.start() |
| |
| last_index = 0 |
| while workflow_thread.is_alive() or (last_index < len(LOG_BUFFER)): |
| with LOG_BUFFER_LOCK: |
| new_logs = LOG_BUFFER[last_index:] |
| last_index = len(LOG_BUFFER) |
| if new_logs: |
| filtered_logs = filter_logs(new_logs) |
| status_msg = filtered_logs[-1] |
| detail_msg = "<br/>".join(filtered_logs) |
| yield status_msg, detail_msg, [] |
| time.sleep(0.5) |
| |
| workflow_thread.join() |
| with LOG_BUFFER_LOCK: |
| final_logs = LOG_BUFFER[:] |
| raw_result = result_container.get("raw_result", "No results returned.") |
| html_result, repo_names = parse_result_to_html(raw_result, num_results) |
| yield "", html_result, repo_names |
|
|
| def lite_runner(topic, num_results): |
| logging.info("[UI] Running lite_runner for topic: %s", topic) |
| yield "Workflow started", "<p>Processing your request. Please wait...</p>", [] |
| for status, details, repos in stream_lite_workflow(topic, num_results): |
| yield status, details, repos |
|
|
| |
| |
| |
| with gr.Blocks( |
| theme=gr.themes.Soft(text_size=sizes.text_md), |
| title="DeepGit Lite", |
| css=""" |
| /* Center header and footer */ |
| #header { text-align: center; margin-bottom: 20px; } |
| #main-container { max-width: 800px; margin: auto; } |
| #footer { text-align: center; margin-top: 20px; } |
| """ |
| ) as demo: |
| gr.Markdown( |
| """ |
| <div style="padding-top: 60px;"> |
| <div style="display: flex; align-items: center; justify-content: center;"> |
| <img src="https://img.icons8.com/?size=100&id=118557&format=png&color=000000" |
| style="width: 60px; height: 60px; margin-right: 12px;"> |
| <h1 style="margin: 0; font-size: 2.5em; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;"> |
| DeepGit Lite |
| </h1> |
| </div> |
| <div style="text-align: center; margin-top: 20px; font-size: 1.1em; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;"> |
| <p> |
| ✨ DeepGit Lite is the lightweight pro version of <strong>DeepGit</strong>.<br> |
| It harnesses advanced deep semantic search to explore GitHub repositories and deliver curated results.<br> |
| Under the hood, it leverages a hybrid ranking approach combining dense retrieval, BM25 scoring, and cross-encoder re-ranking for optimal discovery.<br> |
| </p> |
| <p> |
| <strong>New!</strong> 🗨️ After searching, you can <b>chat with the agent about any repository you find</b>.<br> |
| The conversation agent runs in the background and is ready to answer your questions about setup, usage, or details for each repo.<br> |
| Just click "Go to Chat" after your search, select a repository, and start your conversation! |
| </p> |
| <p> |
| 🚀 Check out the full DeepGit version on |
| <a href="https://github.com/zamalali/DeepGit" target="_blank">GitHub</a> and ⭐ |
| <strong>Star DeepGit</strong> on GitHub! |
| </p> |
| </div> |
| </div> |
| """, |
| elem_id="header" |
| ) |
|
|
| |
| with gr.Column(elem_id="main-container", visible=True) as search_ui: |
| research_input = gr.Textbox( |
| label="Research Query", |
| placeholder="Enter your research topic here, e.g., Looking for a low code/no code tool to augment images and annotations?", |
| lines=3 |
| ) |
| num_results_slider = gr.Slider( |
| minimum=5, maximum=25, value=10, step=1, |
| label="Number of Results to Display", |
| info="Choose how many top repositories to show (sorted by score)" |
| ) |
| run_button = gr.Button("Run DeepGit Lite", variant="primary") |
| status_display = gr.Markdown(label="Status") |
| detail_display = gr.HTML(label="Results") |
| repo_state = gr.State([]) |
| go_to_chat_btn = gr.Button("Go to Chat", visible=False) |
|
|
| |
| with gr.Column(visible=False) as chat_ui: |
|
|
| repo_choice = gr.Radio(choices=[], label="Select a repository", interactive=True) |
| chat_history = gr.Chatbot(label="Chat with GitHub Agent") |
| user_input = gr.Textbox(label="Your question", placeholder="Ask about the selected repo...e.g., tell me a bit more and guide me to set this up and running?") |
| send_btn = gr.Button("Send") |
| chat_state = gr.State([]) |
| back_btn = gr.Button("Back to Search") |
|
|
| def update_chat_button(status, details, repos): |
| logging.info("[UI] Search complete. Showing Go to Chat button: %s", bool(repos)) |
| return gr.update(visible=bool(repos)), repos |
|
|
| def show_chat_ui(repos): |
| logging.info("[UI] Switching to Chat UI. Repositories available: %s", repos) |
| return gr.update(visible=False), gr.update(visible=True), gr.update(choices=repos, value=None), [] |
|
|
| def back_to_search(): |
| logging.info("[UI] Switching back to Search UI.") |
| return gr.update(visible=True), gr.update(visible=False), gr.update(value=[]), gr.update(value=None), [] |
|
|
| def chat_with_agent(user_msg, repo, history): |
| logging.info("[Chat] User sent message: '%s' for repo: '%s'", user_msg, repo) |
| if not user_msg or not user_msg.strip(): |
| |
| return history + [["", "Please enter a message before sending."]], history |
| if not repo: |
| return history + [[user_msg, "Please select a repository first."]], history |
| full_query = f"[{repo}] {user_msg}" |
| try: |
| result = agent.agent_executor.invoke({"input": full_query}) |
| answer = result["output"] |
| logging.info("[Chat] Agent response received.") |
| except Exception as e: |
| answer = f"Error: {e}" |
| logging.error("[Chat] Error in agent_executor: %s", e) |
| history = history + [[user_msg, answer]] |
| return history, history |
|
|
| |
| def can_send(user_msg, repo): |
| if not user_msg or not user_msg.strip(): |
| return gr.update(interactive=False, value="Enter a message to send") |
| if not repo: |
| return gr.update(interactive=False, value="Select a repository") |
| return gr.update(interactive=True, value="Send") |
| user_input.change( |
| fn=can_send, |
| inputs=[user_input, repo_choice], |
| outputs=[send_btn], |
| show_progress=False |
| ) |
| repo_choice.change( |
| fn=can_send, |
| inputs=[user_input, repo_choice], |
| outputs=[send_btn], |
| show_progress=False |
| ) |
|
|
| run_button.click( |
| fn=lite_runner, |
| inputs=[research_input, num_results_slider], |
| outputs=[status_display, detail_display, repo_state], |
| api_name="deepgit_lite", |
| show_progress=True |
| ).then( |
| fn=update_chat_button, |
| inputs=[status_display, detail_display, repo_state], |
| outputs=[go_to_chat_btn, repo_state] |
| ) |
|
|
| research_input.submit( |
| fn=lite_runner, |
| inputs=[research_input, num_results_slider], |
| outputs=[status_display, detail_display, repo_state], |
| api_name="deepgit_lite_submit", |
| show_progress=True |
| ).then( |
| fn=update_chat_button, |
| inputs=[status_display, detail_display, repo_state], |
| outputs=[go_to_chat_btn, repo_state] |
| ) |
|
|
| go_to_chat_btn.click( |
| fn=show_chat_ui, |
| inputs=[repo_state], |
| outputs=[search_ui, chat_ui, repo_choice, chat_state] |
| ) |
|
|
| back_btn.click( |
| fn=back_to_search, |
| inputs=[], |
| outputs=[search_ui, chat_ui, chat_history, repo_choice, chat_state] |
| ) |
|
|
| send_btn.click( |
| fn=chat_with_agent, |
| inputs=[user_input, repo_choice, chat_state], |
| outputs=[chat_history, chat_state], |
| queue=False |
| ) |
| user_input.submit( |
| fn=chat_with_agent, |
| inputs=[user_input, repo_choice, chat_state], |
| outputs=[chat_history, chat_state], |
| queue=False |
| ) |
|
|
| gr.HTML( |
| """ |
| <div id="footer"> |
| Made with ❤️ by <b>Zamal</b> |
| </div> |
| """ |
| ) |
|
|
| demo.queue(max_size=10).launch() |
|
|