| import streamlit as st |
| import json |
| import difflib |
| import re |
|
|
| def normalize(s): |
| return ' '.join(str(s).lower().replace("_", " ").replace("-", " ").replace(".", " ").split()) |
|
|
| def is_fuzzy_match(a, b, threshold=0.7): |
| ratio = difflib.SequenceMatcher(None, a, b).ratio() |
| return ratio >= threshold or a in b or b in a |
|
|
| def recursive_fuzzy_value_search(target_value): |
| matches = [] |
| norm_target = normalize(target_value) |
| for file_name, data in st.session_state.json_data.items(): |
| def _search(obj, path): |
| if isinstance(obj, dict): |
| for k, v in obj.items(): |
| |
| if isinstance(v, (str, int, float, bool)) and is_fuzzy_match(norm_target, normalize(v)): |
| matches.append({ |
| "match_path": path + [k], |
| "matched_value": v, |
| "key": k, |
| "record": obj, |
| "file": file_name |
| }) |
| |
| if isinstance(v, dict): |
| for nk, nv in v.items(): |
| if isinstance(nv, (str, int, float, bool)) and is_fuzzy_match(norm_target, normalize(nv)): |
| matches.append({ |
| "match_path": path + [k, nk], |
| "matched_value": nv, |
| "key": nk, |
| "record": v, |
| "file": file_name |
| }) |
| _search(v, path + [k]) |
| elif isinstance(obj, list): |
| for idx, item in enumerate(obj): |
| _search(item, path + [f"[{idx}]"]) |
| _search(data, []) |
| return matches |
|
|
| def show_all_strings(): |
| found = [] |
| for file_name, data in st.session_state.json_data.items(): |
| def recursive(obj, path): |
| if isinstance(obj, dict): |
| for k, v in obj.items(): |
| if isinstance(v, (str, int, float, bool)): |
| found.append(f"{file_name} | {'.'.join(path + [k])} = {v}") |
| elif isinstance(v, dict): |
| for nk, nv in v.items(): |
| if isinstance(nv, (str, int, float, bool)): |
| found.append(f"{file_name} | {'.'.join(path + [k, nk])} = {nv}") |
| recursive(v, path + [k]) |
| elif isinstance(obj, list): |
| for idx, item in enumerate(obj): |
| recursive(item, path + [f"[{idx}]"]) |
| recursive(data, []) |
| return found |
|
|
| def handle_user_query(query): |
| patterns = [ |
| r"(?:last\s*login.*?for|when\s+did)\s+([a-zA-Z0-9 _\-\.@]+)", |
| r"when\s+was\s+([a-zA-Z0-9 _\-\.@]+)\s+last\s+(?:login|logged\s*in)", |
| r"last\s*login\s*of\s+([a-zA-Z0-9 _\-\.@]+)", |
| r"(?:info|details|record) for\s+([a-zA-Z0-9 _\-\.@]+)" |
| ] |
| found_value = None |
| for pat in patterns: |
| m = re.search(pat, query, re.IGNORECASE) |
| if m: |
| found_value = m.group(1).strip() |
| break |
| if not found_value: |
| |
| m = re.search(r"([A-Za-z0-9][A-Za-z0-9 _\-\.@]*)", query) |
| if m: |
| found_value = m.group(1).strip() |
| if found_value: |
| results = recursive_fuzzy_value_search(found_value) |
| if not results: |
| return f"No records found for '{found_value}' in any file." |
| answers = [] |
| for res in results: |
| answers.append( |
| f"**{res['matched_value']}** (in file `{res['file']}` | key: `{res['key']}` | path: `{'.'.join(res['match_path'])}`)" |
| ) |
| return "\n\n".join(answers) |
| else: |
| return "No valid search value detected. Try a person's name, number, product, device, etc." |
|
|
| |
| if "json_data" not in st.session_state: |
| st.session_state.json_data = {} |
| if "messages" not in st.session_state: |
| st.session_state.messages = [] |
| if "temp_input" not in st.session_state: |
| st.session_state.temp_input = "" |
| if "files_loaded" not in st.session_state: |
| st.session_state.files_loaded = False |
|
|
| st.set_page_config(page_title="Flexible JSON Fuzzy Search", layout="wide") |
| st.title("Instant JSON-Backed Q&A (Flexible Fuzzy Search — All Keys & Types!)") |
|
|
| uploaded_files = st.sidebar.file_uploader( |
| "Choose one or more JSON files", type="json", accept_multiple_files=True |
| ) |
| if uploaded_files and not st.session_state.files_loaded: |
| st.session_state.json_data.clear() |
| for f in uploaded_files: |
| try: |
| content = json.load(f) |
| st.session_state.json_data[f.name] = content |
| st.sidebar.success(f"Loaded: {f.name}") |
| except Exception as e: |
| st.sidebar.error(f"Error reading {f.name}: {e}") |
| st.session_state.messages = [] |
| st.session_state.files_loaded = True |
| elif not uploaded_files: |
| st.session_state.json_data.clear() |
| st.session_state.files_loaded = False |
|
|
| st.markdown("### Ask about ANY value (name, product, number, device, etc) — partials/typos/substring OK!") |
| for msg in st.session_state.messages: |
| if msg["role"] == "user": |
| st.markdown(f"<div style='color: #4F8BF9;'><b>User:</b> {msg['content']}</div>", unsafe_allow_html=True) |
| else: |
| st.markdown(f"<div style='color: #1C6E4C;'><b>Agent:</b> {msg['content']}</div>", unsafe_allow_html=True) |
|
|
| def send_message(): |
| user_input = st.session_state.temp_input |
| if user_input.strip(): |
| st.session_state.messages.append({"role": "user", "content": user_input}) |
| answer = handle_user_query(user_input) |
| st.session_state.messages.append({"role": "assistant", "content": answer}) |
| st.session_state.temp_input = "" |
|
|
| if st.session_state.json_data: |
| st.text_input("Your message:", key="temp_input", on_change=send_message) |
| if st.button("Show all values in uploaded JSONs"): |
| st.write(show_all_strings()) |
| else: |
| st.info("Please upload at least one JSON file to start chatting.") |
|
|