Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """CanLex Web (Path B) -- a private web front-end for CanLex legal research. | |
| A thin client that gives a non-Claude user roughly the same experience as Claude | |
| with the CanLex MCP server. For each question it opens one streamable-HTTP | |
| session against the deployed CanLex MCP, declares the four CanLex tools to | |
| Google Gemini, and lets the model agentically iterate -- searching, fetching | |
| sections, and looking up case-law citations -- until it decides it has enough | |
| material to compose a grounded answer. | |
| All configuration comes from environment variables, set as Hugging Face Space | |
| secrets. Run locally with: python app.py | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import queue | |
| import sys | |
| import threading | |
| import urllib.error | |
| import urllib.request | |
| from datetime import timedelta | |
| import gradio as gr | |
| from mcp import ClientSession | |
| from mcp.client.streamable_http import streamablehttp_client | |
| # --- Configuration (Hugging Face Space secrets / environment variables) ------- | |
| # The deployed CanLex MCP server. Retrieval logic and the corpus live there; this | |
| # app never carries its own copy. Override only to point at a different server. | |
| MCP_URL = os.environ.get( | |
| "CANLEX_MCP_URL", "https://beemer0-canlex.hf.space/mcp").strip() | |
| # Google Gemini -- the free-tier key is supplied as the GEMINI_API_KEY secret. | |
| GEMINI_MODEL = os.environ.get("CANLEX_GEMINI_MODEL", "gemini-2.5-pro").strip() | |
| GEMINI_ENDPOINT = ("https://generativelanguage.googleapis.com/v1beta/models/" | |
| f"{GEMINI_MODEL}:generateContent") | |
| MAX_OUTPUT_TOKENS = 8192 # generous -- covers Gemini 2.5 thinking plus the answer | |
| MAX_TOOL_ITERATIONS = 8 # loop guard for the agent | |
| REQUEST_TIMEOUT = 180 # seconds, applied to the MCP and Gemini calls | |
| def _load_auth() -> list[tuple[str, str]]: | |
| """Parse CANLEX_WEB_AUTH (one 'username:password' per line) for Gradio auth.""" | |
| creds: list[tuple[str, str]] = [] | |
| for line in os.environ.get("CANLEX_WEB_AUTH", "").splitlines(): | |
| line = line.strip() | |
| if not line or ":" not in line: | |
| continue | |
| user, password = (p.strip() for p in line.split(":", 1)) | |
| if user and password: | |
| creds.append((user, password)) | |
| if not creds: | |
| print("WARNING: CANLEX_WEB_AUTH is not set; using the insecure default " | |
| "login 'canlex' / 'canlex'. Set CANLEX_WEB_AUTH as a Space secret " | |
| "(one 'username:password' per line) before sharing this app.", | |
| file=sys.stderr) | |
| creds = [("canlex", "canlex")] | |
| return creds | |
| AUTH = _load_auth() | |
| # --- Tool declarations (Gemini function-calling schema) ----------------------- | |
| # The four CanLex MCP tools, declared so Gemini can call them. Three of the four | |
| # wrap their arguments inside a single 'params' object on the server side; the | |
| # Gemini schema is kept flat for the model's convenience and re-wrapped at the | |
| # MCP edge in _run_tool. | |
| TOOL_DECLARATIONS = [ | |
| { | |
| "name": "canlex_search_legislation", | |
| "description": ( | |
| "Search Canadian federal law, CBSA D-Memoranda, collective " | |
| "agreements, NJC directives, leading court decisions and IRPA " | |
| "delegation instruments for material relevant to a question. " | |
| "Use this first for any topical question. Returns ranked source " | |
| "passages with citations. Call it multiple times for different " | |
| "facets of a question, or with the optional 'act' or 'doc_type' " | |
| "filters to narrow the search." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": ( | |
| "Natural-language legal question or keywords, e.g. " | |
| "'detention review timelines' or 'innocent " | |
| "misrepresentation defence under IRPA s. 40'." | |
| ), | |
| }, | |
| "top_k": { | |
| "type": "integer", | |
| "description": "Number of sections to return (1-20). Default 6.", | |
| }, | |
| "act": { | |
| "type": "string", | |
| "description": ( | |
| "Optional. Restrict to a single Act, by short name or " | |
| "code (e.g. 'IRPA' or 'I-2.5')." | |
| ), | |
| }, | |
| "doc_type": { | |
| "type": "string", | |
| "description": ( | |
| "Optional. Restrict to one source type: 'legislation', " | |
| "'memorandum' (CBSA D-Memoranda), 'agreement' " | |
| "(collective agreements), 'directive' (NJC), " | |
| "'caselaw' (court and tribunal decisions), or " | |
| "'delegation' (IRPA/IRPR delegation and designation)." | |
| ), | |
| }, | |
| }, | |
| "required": ["query"], | |
| }, | |
| }, | |
| { | |
| "name": "canlex_get_section", | |
| "description": ( | |
| "Fetch the full text of one specific provision when its Act and " | |
| "section number are known. Use this to follow a cross-reference " | |
| "the search results mention but did not include." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "act": { | |
| "type": "string", | |
| "description": "Act short name or code, e.g. 'IRPA' or 'I-2.5'.", | |
| }, | |
| "section": { | |
| "type": "string", | |
| "description": "Section number exactly as cited, e.g. '34', '20.1'.", | |
| }, | |
| }, | |
| "required": ["act", "section"], | |
| }, | |
| }, | |
| { | |
| "name": "canlex_list_acts", | |
| "description": ( | |
| "List every Act, regulation, agreement, directive, case-law " | |
| "decision and delegation instrument loaded into the CanLex " | |
| "corpus. Useful when the user asks 'what does CanLex have on X?' " | |
| "or when you need to confirm a source is in scope." | |
| ), | |
| "parameters": {"type": "object", "properties": {}}, | |
| }, | |
| { | |
| "name": "canlex_case", | |
| "description": ( | |
| "Look up a Canadian case on CanLII to check its citation graph -- " | |
| "cases that cite it, cases it cites, legislation it cites. Use " | |
| "this to confirm a decision found in search results is still good " | |
| "law and to find related authorities. Pass a neutral citation " | |
| "(e.g. '2019 SCC 65', '2016 FCA 93', '2005 FC 1059') or a full " | |
| "canlii.org URL." | |
| ), | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "case_url": { | |
| "type": "string", | |
| "description": ( | |
| "Neutral citation (preferred for SCC/FCA/FC) or full " | |
| "canlii.org URL." | |
| ), | |
| }, | |
| }, | |
| "required": ["case_url"], | |
| }, | |
| }, | |
| ] | |
| # The three tools whose MCP signature wraps arguments under a single 'params' | |
| # object. canlex_list_acts takes none and is handled separately in _run_tool. | |
| _PARAMS_WRAPPED = {"canlex_search_legislation", "canlex_get_section", "canlex_case"} | |
| # --- System prompt ------------------------------------------------------------ | |
| SYSTEM_INSTRUCTION = """\ | |
| You are CanLex Web, a Canadian legal-research assistant. A member of the public \ | |
| has asked the legal question shown below through a web form. Answer it by \ | |
| agentically using the four CanLex tools to retrieve primary sources, then \ | |
| compose a clear, well-organised answer grounded entirely in what those tools \ | |
| return. | |
| Tool-use guidance: | |
| - Start with canlex_search_legislation on the user's question. Read the \ | |
| results, including the "ANSWERING INSTRUCTIONS" block CanLex returns. | |
| - If a result mentions a cross-referenced provision, regulation or D-Memo that \ | |
| bears on the question but is not reproduced, call canlex_get_section or \ | |
| canlex_search_legislation again to fetch it. Do not guess its contents. | |
| - For a question that turns on case law, consider calling canlex_case on the \ | |
| leading decision's neutral citation to confirm it has not been overtaken. | |
| - You may call tools multiple times; iterate until you have enough material to \ | |
| answer well. Aim for thoroughness but stop once further calls would not change \ | |
| the answer. | |
| Answering style: | |
| - Write for a reader who cannot see the raw passages. Quote the key operative \ | |
| words ("inadmissible for misrepresentation", etc.) and give every citation in \ | |
| full, including section numbers and the deciding court. | |
| - Distinguish source kinds: enacted law is binding; CBSA D-Memoranda are \ | |
| administrative guidance, persuasive only; collective agreements and NJC \ | |
| directives are binding employment-terms instruments for a bargaining unit; \ | |
| court decisions are binding precedent depending on the court and jurisdiction. | |
| - State the date the source is current to, and note that the answer reflects \ | |
| the law only as of that date. | |
| - Use plain Markdown -- short paragraphs, headings or lists where they aid \ | |
| clarity. | |
| - If the retrieved material does not actually answer the question, say so \ | |
| plainly rather than stretching it to fit. | |
| - Close with a one-line reminder that this is legal information, not legal \ | |
| advice.""" | |
| # --- Agentic loop: Gemini <-> MCP -------------------------------------------- | |
| class _AgentError(RuntimeError): | |
| """Surfaced to the UI; the message text is shown verbatim.""" | |
| def _gemini_request_body(contents: list[dict]) -> dict: | |
| """The JSON body sent to Gemini -- identical between the streaming and the | |
| non-streaming endpoints. Tool declarations turn on function calling; the | |
| safety filters are relaxed because legal research routinely discusses | |
| crime, weapons and the like, and the high-threshold defaults spuriously | |
| block legitimate legal text.""" | |
| return { | |
| "systemInstruction": {"parts": [{"text": SYSTEM_INSTRUCTION}]}, | |
| "contents": contents, | |
| "tools": [{"functionDeclarations": TOOL_DECLARATIONS}], | |
| "toolConfig": {"functionCallingConfig": {"mode": "AUTO"}}, | |
| "generationConfig": { | |
| "temperature": 0.2, | |
| "maxOutputTokens": MAX_OUTPUT_TOKENS, | |
| }, | |
| "safetySettings": [ | |
| {"category": c, "threshold": "BLOCK_ONLY_HIGH"} | |
| for c in ("HARM_CATEGORY_HARASSMENT", "HARM_CATEGORY_HATE_SPEECH", | |
| "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
| "HARM_CATEGORY_DANGEROUS_CONTENT") | |
| ], | |
| } | |
| # Gemini's streamGenerateContent endpoint, used when alt=sse is requested, | |
| # sends one Server-Sent Event per partial GenerateContentResponse. Each chunk | |
| # carries an incremental slice of the turn's content -- a text delta or a | |
| # (complete) functionCall part. The accumulated parts list is what gets sent | |
| # back as the assistant turn for the next round. | |
| _STREAM_ENDPOINT = GEMINI_ENDPOINT.replace( | |
| ":generateContent", ":streamGenerateContent") + "?alt=sse" | |
| async def _gemini_stream(api_key: str, contents: list[dict]): | |
| """Async generator over Gemini's streaming response. | |
| Yields dicts of one of three shapes: | |
| {"type": "text_delta", "text": str} -- a partial answer fragment | |
| {"type": "function_call", "call": dict} -- a complete tool call | |
| {"type": "finish", "reason": str|None, -- end of stream; `parts` is | |
| "parts": list[dict]} the whole assistant turn | |
| """ | |
| body = _gemini_request_body(contents) | |
| request = urllib.request.Request( | |
| _STREAM_ENDPOINT, | |
| data=json.dumps(body).encode("utf-8"), | |
| headers={"Content-Type": "application/json", | |
| "x-goog-api-key": api_key, | |
| "Accept": "text/event-stream"}, | |
| method="POST", | |
| ) | |
| try: | |
| # `timeout` is a kwarg of urlopen; passing it positionally to | |
| # asyncio.to_thread would forward it as `data` (POST body) and break | |
| # the request. | |
| response = await asyncio.to_thread( | |
| lambda: urllib.request.urlopen(request, timeout=REQUEST_TIMEOUT)) | |
| except urllib.error.HTTPError as exc: | |
| detail = await asyncio.to_thread(exc.read) | |
| text = detail.decode("utf-8", "replace")[:600] | |
| raise _AgentError(f"Gemini API returned HTTP {exc.code}: {text}") from None | |
| except urllib.error.URLError as exc: | |
| raise _AgentError(f"Could not reach the Gemini API: {exc.reason}") from None | |
| accumulated_parts: list[dict] = [] | |
| finish_reason = None | |
| try: | |
| while True: | |
| raw = await asyncio.to_thread(response.readline) | |
| if not raw: | |
| break | |
| line = raw.decode("utf-8", "replace").rstrip() | |
| if not line.startswith("data: "): | |
| continue | |
| try: | |
| chunk = json.loads(line[6:]) | |
| except ValueError: | |
| continue | |
| candidate = (chunk.get("candidates") or [{}])[0] | |
| for part in (candidate.get("content") or {}).get("parts") or []: | |
| accumulated_parts.append(part) | |
| if part.get("thought"): | |
| # Gemini 2.5 thinking summary -- preserved in history for | |
| # the model's own context, never streamed to the user. | |
| continue | |
| if "text" in part: | |
| yield {"type": "text_delta", "text": part["text"]} | |
| elif "functionCall" in part: | |
| yield {"type": "function_call", "call": part["functionCall"]} | |
| if candidate.get("finishReason"): | |
| finish_reason = candidate["finishReason"] | |
| finally: | |
| await asyncio.to_thread(response.close) | |
| yield {"type": "finish", "reason": finish_reason, "parts": accumulated_parts} | |
| async def _run_tool(session: ClientSession, name: str, args: dict) -> str: | |
| """Execute a Gemini function call against the MCP, returning text output.""" | |
| if name == "canlex_list_acts": | |
| mcp_args: dict = {} | |
| elif name in _PARAMS_WRAPPED: | |
| # The MCP server's tools accept their schema as a single 'params' object. | |
| mcp_args = {"params": args or {}} | |
| else: | |
| return f"(unknown tool '{name}')" | |
| try: | |
| result = await session.call_tool(name, mcp_args) | |
| except Exception as exc: # MCP transport errors | |
| return f"(tool '{name}' failed: {type(exc).__name__}: {exc})" | |
| text = "\n".join( | |
| block.text for block in result.content | |
| if getattr(block, "type", None) == "text" and getattr(block, "text", None) | |
| ).strip() | |
| if result.isError: | |
| return f"(tool '{name}' reported an error: {text or 'no detail'})" | |
| return text or "(no content returned)" | |
| def _summarize_call(name: str, args: dict) -> str: | |
| """Render a tool call as a one-line user-facing status string.""" | |
| args = args or {} | |
| if name == "canlex_search_legislation": | |
| bits = [f"`{args.get('query', '')}`"] | |
| if args.get("act"): | |
| bits.append(f"in {args['act']}") | |
| if args.get("doc_type"): | |
| bits.append(f"({args['doc_type']} only)") | |
| return "Searching " + " ".join(bits) | |
| if name == "canlex_get_section": | |
| return f"Fetching {args.get('act', '?')} s. {args.get('section', '?')}" | |
| if name == "canlex_case": | |
| return f"Looking up case {args.get('case_url', '?')}" | |
| if name == "canlex_list_acts": | |
| return "Listing the CanLex corpus" | |
| return f"Calling {name}" | |
| def _format_sources(tool_log: list[tuple[str, dict, str]]) -> str: | |
| """Render every tool call's output as one Markdown document for display.""" | |
| if not tool_log: | |
| return "" | |
| blocks = [] | |
| for i, (name, args, output) in enumerate(tool_log, 1): | |
| blocks.append( | |
| f"### Call {i}: `{name}`\n\n" | |
| f"_Arguments:_ `{json.dumps(args, ensure_ascii=False)}`\n\n" | |
| f"{output}" | |
| ) | |
| return "\n\n---\n\n".join(blocks) | |
| async def _agentic_answer(question: str): | |
| """Run the Gemini-driven agentic loop against a single MCP session. | |
| Yields tuples of (status, answer_md, sources_md). The final yield carries | |
| the composed answer; earlier yields are progress updates the UI can show. | |
| """ | |
| api_key = os.environ.get("GEMINI_API_KEY", "").strip() | |
| if not api_key: | |
| raise _AgentError( | |
| "GEMINI_API_KEY is not set. Add it as a Space secret -- create a " | |
| "free key at Google AI Studio (https://aistudio.google.com/apikey).") | |
| yield "_Connecting to the CanLex retrieval service..._", "", "" | |
| async with streamablehttp_client( | |
| MCP_URL, | |
| timeout=timedelta(seconds=REQUEST_TIMEOUT), | |
| sse_read_timeout=timedelta(seconds=REQUEST_TIMEOUT), | |
| ) as (read, write, _): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| contents: list[dict] = [ | |
| {"role": "user", "parts": [{"text": question}]} | |
| ] | |
| tool_log: list[tuple[str, dict, str]] = [] | |
| trace: list[str] = [] | |
| answer_buf = "" | |
| def status_md(thinking: bool = True) -> str: | |
| lines = [f"- {line}" for line in trace] | |
| if thinking: | |
| lines.append("- _Thinking..._") | |
| return "\n".join(lines) if lines else "" | |
| for step in range(MAX_TOOL_ITERATIONS): | |
| yield status_md(), answer_buf, _format_sources(tool_log) | |
| # Stream Gemini's next turn. Stream text deltas to the answer | |
| # panel optimistically; revert to the pre-turn answer if it | |
| # turns out to be a tool-calling turn (the streamed text was | |
| # then commentary, kept in the trace instead). | |
| turn_text = "" | |
| turn_calls: list[dict] = [] | |
| turn_parts: list[dict] = [] | |
| optimistic = True | |
| async for chunk in _gemini_stream(api_key, contents): | |
| if chunk["type"] == "text_delta" and optimistic: | |
| turn_text += chunk["text"] | |
| yield (status_md(), | |
| answer_buf + turn_text, | |
| _format_sources(tool_log)) | |
| elif chunk["type"] == "function_call": | |
| turn_calls.append(chunk["call"]) | |
| if optimistic and turn_text: | |
| # Roll the answer panel back; the commentary moves | |
| # into the trace once the tool labels are drawn. | |
| optimistic = False | |
| yield (status_md(), | |
| answer_buf, | |
| _format_sources(tool_log)) | |
| elif chunk["type"] == "finish": | |
| turn_parts = chunk["parts"] or [] | |
| # Capture any text-only finish reason so the caller can | |
| # surface a useful error for an empty answer. | |
| finish_reason = chunk.get("reason") | |
| contents.append({"role": "model", "parts": turn_parts}) | |
| if not turn_calls: | |
| # Final turn -- the text was already streamed; finalize. | |
| if not turn_text: | |
| raise _AgentError( | |
| f"Gemini produced an empty answer (finishReason: " | |
| f"{finish_reason!s}). If this is MAX_TOKENS, " | |
| "raise MAX_OUTPUT_TOKENS in app.py.") | |
| answer_buf += turn_text | |
| yield status_md(thinking=False), answer_buf, \ | |
| _format_sources(tool_log) | |
| return | |
| # Tool turn. If the model emitted a commentary fragment before | |
| # its function calls, surface it once in the trace -- it often | |
| # explains WHY the next tools are being called. | |
| if turn_text: | |
| snippet = turn_text.strip().replace("\n", " ") | |
| if len(snippet) > 140: | |
| snippet = snippet[:137].rstrip() + "..." | |
| trace.append(f"_{snippet}_") | |
| # Execute every function call in this turn, then send the | |
| # responses back as a single 'user' message. | |
| function_responses = [] | |
| for call in turn_calls: | |
| name = call.get("name", "") | |
| args = call.get("args") or {} | |
| label = _summarize_call(name, args) | |
| trace.append(label) | |
| yield status_md(), answer_buf, _format_sources(tool_log) | |
| output = await _run_tool(session, name, args) | |
| tool_log.append((name, args, output)) | |
| function_responses.append({ | |
| "functionResponse": { | |
| "name": name, | |
| "response": {"output": output}, | |
| } | |
| }) | |
| contents.append({"role": "user", "parts": function_responses}) | |
| # Loop budget exhausted -- ask Gemini for a final answer without | |
| # further tool use rather than leave the user with nothing. We | |
| # stream this terminal turn too, so the user sees it compose. | |
| contents.append({"role": "user", "parts": [{"text": | |
| "You have reached the maximum number of tool calls. Compose " | |
| "the best answer you can from the material gathered so far, " | |
| "without calling further tools. If the material is " | |
| "insufficient, say so plainly."}]}) | |
| turn_text = "" | |
| async for chunk in _gemini_stream(api_key, contents): | |
| if chunk["type"] == "text_delta": | |
| turn_text += chunk["text"] | |
| yield (status_md(thinking=False), | |
| answer_buf + turn_text, | |
| _format_sources(tool_log)) | |
| answer_buf += turn_text or \ | |
| "_(no answer produced after the tool-call budget was exhausted)_" | |
| yield status_md(thinking=False), answer_buf, _format_sources(tool_log) | |
| # --- Gradio handler ----------------------------------------------------------- | |
| ANSWER_PLACEHOLDER = "*Your answer will appear here.*" | |
| _SENTINEL = object() | |
| def answer(question: str): | |
| """Generator wrapping the async agent for Gradio's progressive UI. | |
| The async work runs on a dedicated worker thread with its own event loop | |
| and stays inside a single asyncio task for the whole question. Items are | |
| handed back to this sync generator through a thread-safe queue. The | |
| previous loop.run_until_complete-per-anext pattern created a fresh task | |
| on every yield, which tripped anyio's cancel-scope check inside the MCP | |
| streamable-HTTP client ('Attempted to exit cancel scope in a different | |
| task than it was entered in').""" | |
| question = (question or "").strip() | |
| if not question: | |
| yield "Please enter a legal question above.", ANSWER_PLACEHOLDER, "" | |
| return | |
| events: queue.Queue = queue.Queue() | |
| def worker(): | |
| async def run(): | |
| try: | |
| async for tup in _agentic_answer(question): | |
| events.put(("yield", tup)) | |
| except _AgentError as exc: | |
| events.put(("agent_error", exc)) | |
| except Exception as exc: # network blip, MCP transport | |
| events.put(("error", exc)) | |
| finally: | |
| events.put((_SENTINEL,)) | |
| try: | |
| asyncio.run(run()) | |
| except Exception as exc: # loop setup failures | |
| events.put(("error", exc)) | |
| events.put((_SENTINEL,)) | |
| threading.Thread(target=worker, daemon=True).start() | |
| while True: | |
| kind, *payload = events.get() | |
| if kind is _SENTINEL: | |
| return | |
| if kind == "yield": | |
| yield payload[0] | |
| elif kind == "agent_error": | |
| yield (f"**{payload[0]}**", ANSWER_PLACEHOLDER, "") | |
| elif kind == "error": | |
| exc = payload[0] | |
| # Unwrap ExceptionGroup (from anyio TaskGroups in the MCP client) | |
| # so the user sees the actual root cause, not the wrapper. | |
| lines = [] | |
| def _walk(e, depth=0): | |
| indent = " " * depth | |
| lines.append(f"{indent}- `{type(e).__name__}: {e}`") | |
| inner = getattr(e, "exceptions", None) | |
| if inner: | |
| for sub in inner: | |
| _walk(sub, depth + 1) | |
| _walk(exc) | |
| yield ("**Could not complete the request.**\n\n" | |
| + "\n".join(lines) + | |
| "\n\nThe MCP service may be waking from sleep -- " | |
| "try again in a moment.", ANSWER_PLACEHOLDER, "") | |
| # --- UI ----------------------------------------------------------------------- | |
| INTRO = """\ | |
| # CanLex -- Canadian Legal Research | |
| Ask a question about Canadian **border, customs, immigration, criminal, drug, | |
| labour or related federal law**. CanLex finds the governing statutory | |
| provisions, D-Memoranda, collective-agreement terms and leading court | |
| decisions, then composes an answer that cites them. | |
| The CanLex corpus contains 31 federal Acts and regulations -- including the | |
| Immigration and Refugee Protection Act, the Customs Act and the Criminal Code | |
| -- alongside the CBSA D-Memoranda, the FB (Border Services) collective | |
| agreement, the National Joint Council directives, leading decisions of the | |
| Supreme Court, the Federal Courts and the federal labour and immigration | |
| tribunals, and the IRPA/IRPR instruments of delegation and designation. | |
| The assistant iterates over the corpus -- searching, fetching sections and | |
| looking up case-law citations -- before composing a grounded answer. A complex | |
| question may take 30 seconds or more. | |
| Legal information, not legal advice -- always verify against the primary sources. | |
| """ | |
| EXAMPLE_QUESTIONS = [ | |
| "What are the detention review timelines for a permanent resident?", | |
| "When is a foreign national inadmissible for serious criminality?", | |
| "What overtime provisions apply to FB-group Border Services officers?", | |
| "Can the CBSA seize goods for an undervalued customs declaration?", | |
| ] | |
| with gr.Blocks(title="CanLex", analytics_enabled=False) as demo: | |
| gr.Markdown(INTRO) | |
| question = gr.Textbox( | |
| label="Your legal question", | |
| placeholder="e.g. What are the detention review timelines for a " | |
| "permanent resident?", | |
| lines=3, | |
| ) | |
| with gr.Row(): | |
| submit = gr.Button("Ask CanLex", variant="primary") | |
| clear = gr.Button("Clear") | |
| gr.Examples(examples=EXAMPLE_QUESTIONS, inputs=question, label="Examples") | |
| # Three panels: a progress trace (also used to surface errors), the final | |
| # composed answer, and the raw tool outputs the agent gathered. | |
| progress_md = gr.Markdown(value="") | |
| answer_md = gr.Markdown(value=ANSWER_PLACEHOLDER) | |
| with gr.Accordion("Retrieved source passages (every tool call)", open=False): | |
| sources_md = gr.Markdown() | |
| submit.click(answer, [question], [progress_md, answer_md, sources_md]) | |
| question.submit(answer, [question], [progress_md, answer_md, sources_md]) | |
| clear.click(lambda: ("", "", ANSWER_PLACEHOLDER, ""), None, | |
| [question, progress_md, answer_md, sources_md]) | |
| if __name__ == "__main__": | |
| print(f"CanLex Web starting -- MCP: {MCP_URL}; model: {GEMINI_MODEL}; " | |
| f"{len(AUTH)} login(s) configured.", file=sys.stderr) | |
| demo.queue() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", "7860")), | |
| auth=AUTH, | |
| auth_message="Sign in to use CanLex.", | |
| ssr_mode=False, # no Node in the slim container; render client-side | |
| ) | |