| import os |
| from dotenv import load_dotenv |
| load_dotenv() |
|
|
| import uuid |
| import streamlit as st |
| import random |
| import torch |
| import threading |
| import time |
| import pandas as pd |
| from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer |
| from peft import PeftModel |
| from huggingface_hub import login, whoami |
|
|
|
|
|
|
| st.set_page_config(layout="wide") |
| scroll_css = """ |
| <style> |
| .table-scroll { |
| overflow-x: auto; |
| width: 100%; |
| max-width: 100%; |
| } |
| </style> |
| """ |
| st.markdown(scroll_css, unsafe_allow_html=True) |
|
|
| st.title("Auto Generate Prompts Using HI Model") |
| st.markdown( |
| """ |
| Humane Intelligence’s Auto Red Teaming prototype is built to empower clients to run red-teaming exercises on their AI applications using HI’s intuitive no-code/low-code platform. |
| |
| The system generates adversarial prompts via a model trained on proprietary HI data, targeting potential vulnerabilities in the client’s models or applications. These responses are then evaluated by a separate judge LLM, also trained by HI. |
| |
| Specifically, the prototype follows these steps: |
| 1. Generates adversarial prompts based on a selected **bias category** and **country/region** using HI’s pre-trained model. |
| 2. Selects the most effective prompts and feeds them into the client’s model to elicit responses. |
| 3. Uses a dedicated HI-trained judge LLM to assess the responses. |
| 4. Produces a final output that includes a **probability score** and a **justification** for each response. |
| """ |
| ) |
|
|
| |
| |
| default_hf_token = st.session_state.get("hf_token", os.getenv("HUGGINGFACE_API_KEY") or "") |
| hf_token = st.sidebar.text_input("Enter your Hugging Face API Token", type="password", value=default_hf_token) |
|
|
| if "hf_logged_in" not in st.session_state: |
| st.session_state.hf_logged_in = False |
|
|
| if st.sidebar.button("Login to Hugging Face"): |
| if hf_token: |
| try: |
| login(token=hf_token) |
| user_info = whoami() |
| st.sidebar.success(f"Logged in as: {user_info['name']}") |
| st.session_state.hf_logged_in = True |
| st.session_state.hf_token = hf_token |
| except Exception as e: |
| st.sidebar.error(f"Login failed: {e}") |
| st.session_state.hf_logged_in = False |
| else: |
| st.sidebar.error("Please provide your Hugging Face API Token.") |
|
|
| if not st.session_state.hf_logged_in: |
| st.warning("Please login to Hugging Face to load the model.") |
| else: |
| |
| def get_device(): |
| if torch.cuda.is_available(): |
| return "cuda" |
| elif torch.backends.mps.is_available(): |
| return "mps" |
| else: |
| return "cpu" |
|
|
| @st.cache_resource(show_spinner=True) |
| def load_model(hf_token): |
| device = get_device() |
| base_model = AutoModelForCausalLM.from_pretrained( |
| "meta-llama/Llama-3.2-1B-Instruct", |
| trust_remote_code=True, |
| torch_dtype=torch.float16, |
| use_auth_token=hf_token |
| ) |
| tokenizer = AutoTokenizer.from_pretrained( |
| "Akash190104/space_turtle_101", |
| use_fast=False, |
| use_auth_token=hf_token |
| ) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
|
|
| model = PeftModel.from_pretrained( |
| base_model, |
| "Akash190104/space_turtle_101", |
| use_auth_token=hf_token |
| ) |
| model.to(device) |
| return model, tokenizer, device |
|
|
| with st.spinner("Loading model, please wait..."): |
| try: |
| model, tokenizer, device = load_model(hf_token) |
| |
| if device == "cuda": |
| try: |
| model.config.use_flash_attention = True |
| st.info("Flash Attention enabled for CUDA.") |
| except Exception as e: |
| st.warning(f"Could not enable Flash Attention: {e}") |
| st.success("Model loaded successfully!") |
| except Exception as e: |
| st.error(f"Model loading failed: {e}") |
| st.error("Ensure your token has access to meta-llama/Llama-3.2-1B-Instruct.") |
| st.stop() |
|
|
| |
| def generate_streaming_sample(prompt_text, current_placeholder): |
| inputs = tokenizer(prompt_text, return_tensors="pt", padding=True).to(device) |
| generated_text = "" |
| token_counter = 0 |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) |
|
|
| thread = threading.Thread( |
| target=model.generate, |
| kwargs=dict( |
| **inputs, |
| max_new_tokens=150, |
| do_sample=True, |
| temperature=0.7, |
| top_p=0.9, |
| streamer=streamer |
| ) |
| ) |
| thread.start() |
| for new_text in streamer: |
| generated_text += new_text |
| token_counter += 1 |
| if token_counter % 5 == 0: |
| |
| current_placeholder.empty() |
| unique_key = f"current_sample_{uuid.uuid4()}" |
| current_placeholder.text_area( |
| "Current Sample Generation", |
| value=generated_text, |
| height=200, |
| key=unique_key |
| ) |
| thread.join() |
| current_placeholder.empty() |
| unique_key = f"current_sample_{uuid.uuid4()}" |
| current_placeholder.text_area( |
| "Current Sample Generation", |
| value=generated_text, |
| height=200, |
| key=unique_key |
| ) |
| return generated_text |
|
|
| |
| biases = [ |
| "Socio-economic bias", "Gender bias", "Caste bias", "Religious bias", "Ethnic bias", "Age bias", |
| "Regional stereotyping", "Language discrimination", "Political bias", "Educational bias", |
| "Occupational bias", "Disability bias", "Appearance-based bias", "Colorism", "Nationality bias", |
| "Urban-rural bias", "Immigration bias" |
| ] |
|
|
| countries = [ |
| "China", "India", "Philippines", "Vietnam", "Southeast Asia", "Europe", "Nigeria", "United States", |
| "Mexico", "Canada", "Germany", "France", "Brazil", "South Africa", "Russia", "Japan", "South Korea", |
| "Australia", "Middle East", "Latin America", "Eastern Europe", "Bangladesh", "Pakistan", "Indonesia", |
| "Turkey", "Egypt", "Kenya", "Argentina" |
| ] |
|
|
| mode = st.radio("Select Mode", ("Interactive", "Random Generation (10 samples)")) |
|
|
| if mode == "Interactive": |
| st.subheader("Interactive Mode") |
| num_samples = st.number_input("Number of samples to generate", min_value=1, value=1, step=1) |
| sample_inputs = [] |
| for i in range(num_samples): |
| st.markdown(f"#### Sample {i+1} Input") |
| |
| bias_options = biases + ["Custom Bias"] |
| bias_choice = st.selectbox("Select Bias Category", options=bias_options, key=f"bias_{i}") |
| if bias_choice == "Custom Bias": |
| custom_bias = st.text_input("Enter Custom Bias", key=f"custom_bias_{i}") |
| final_bias = custom_bias.strip() if custom_bias.strip() != "" else "Custom Bias" |
| else: |
| final_bias = bias_choice |
|
|
| |
| country_options = countries + ["Custom Region"] |
| country_choice = st.selectbox("Select Country/Region", options=country_options, key=f"country_{i}") |
| if country_choice == "Custom Region": |
| custom_region = st.text_input("Enter Custom Region", key=f"custom_region_{i}") |
| final_country = custom_region.strip() if custom_region.strip() != "" else "Custom Region" |
| else: |
| final_country = country_choice |
|
|
| sample_inputs.append((final_bias, final_country)) |
| |
| if st.button("Generate Samples"): |
| if any(bias.strip() == "" or country.strip() == "" for bias, country in sample_inputs): |
| st.error("Please provide valid entries for all samples.") |
| else: |
| final_samples = [] |
| current_placeholder = st.empty() |
| start_time = time.time() |
| for bias_input, country_input in sample_inputs: |
| prompt = f"```{bias_input} in {country_input}```\n" |
| generated = generate_streaming_sample(prompt, current_placeholder) |
| final_samples.append({"Bias Category and Country": prompt, "Auto Generated Prompts": generated}) |
| end_time = time.time() |
| total_time = end_time - start_time |
| st.info(f"{num_samples} sample(s) generated in {total_time:.2f} seconds!") |
| df_final = pd.DataFrame(final_samples) |
| df_final_styled = df_final.style \ |
| .set_properties(subset=["Auto Generated Prompts"], |
| **{"white-space": "pre-wrap", "width": "300px"}) \ |
| .set_properties(subset=["Bias Category and Country"], |
| **{"white-space": "nowrap", "width": "120px"}) |
| st.markdown("**Final Samples**") |
| st.markdown("<div class='table-scroll'>", unsafe_allow_html=True) |
| st.table(df_final_styled) |
| st.markdown("</div>", unsafe_allow_html=True) |
| st.download_button("Download Outputs", df_final.to_csv(index=False), file_name="outputs.csv") |
| |
| st.session_state.single_sample = final_samples |
|
|
| elif mode == "Random Generation (10 samples)": |
| st.subheader("Random Generation Mode") |
| if st.button("Generate 10 Random Samples"): |
| final_samples = [] |
| status_placeholder = st.empty() |
| current_placeholder = st.empty() |
| start_time = time.time() |
| for i in range(10): |
| status_placeholder.info(f"Generating sample {i+1} of 10...") |
| bias_choice = random.choice(biases) |
| country_choice = random.choice(countries) |
| prompt = f"```{bias_choice} in {country_choice}```\n" |
| sample_output = generate_streaming_sample(prompt, current_placeholder) |
| final_samples.append({"Bias Category and Country": prompt, "Auto Generated Prompts": sample_output}) |
| current_placeholder.empty() |
| end_time = time.time() |
| total_time = end_time - start_time |
| status_placeholder.success(f"10 samples generated in {total_time:.2f} seconds!") |
| df_final = pd.DataFrame(final_samples) |
| df_final_styled = df_final.style \ |
| .set_properties(subset=["Auto Generated Prompts"], |
| **{"white-space": "pre-wrap", "width": "300px"}) \ |
| .set_properties(subset=["Bias Category and Country"], |
| **{"white-space": "nowrap", "width": "120px"}) |
| st.markdown("**Final Samples**") |
| st.markdown("<div class='table-scroll'>", unsafe_allow_html=True) |
| st.table(df_final_styled) |
| st.markdown("</div>", unsafe_allow_html=True) |
| |
| st.download_button("Download Outputs", df_final.to_csv(index=False), file_name="outputs.csv") |
| st.session_state.all_samples = final_samples |