| import numpy as np |
| from concurrent.futures import ThreadPoolExecutor |
| import time |
| import json |
| import tritonclient.grpc as grpcclient |
| from tritonclient.utils import * |
| import queue |
| from functools import partial |
| import random |
|
|
|
|
| run_multiple_tests = False |
| |
| resp_list = [] |
| scode_breakup = {} |
|
|
| def np_to_server_dtype(np_dtype): |
| if np_dtype == bool: |
| return "BOOL" |
| elif np_dtype == np.int8: |
| return "INT8" |
| elif np_dtype == np.int16: |
| return "INT16" |
| elif np_dtype == np.int32: |
| return "INT32" |
| elif np_dtype == np.int64: |
| return "INT64" |
| elif np_dtype == np.uint8: |
| return "UINT8" |
| elif np_dtype == np.uint16: |
| return "UINT16" |
| elif np_dtype == np.uint32: |
| return "UINT32" |
| elif np_dtype == np.uint64: |
| return "UINT64" |
| elif np_dtype == np.float16: |
| return "FP16" |
| elif np_dtype == np.float32: |
| return "FP32" |
| elif np_dtype == np.float64: |
| return "FP64" |
| elif np_dtype == np.object_ or np_dtype.type == np.bytes_: |
| return "BYTES" |
| return None |
|
|
| class UserData: |
| def __init__(self): |
| self._completed_requests = queue.Queue() |
|
|
| def callback(user_data, result, error): |
| if error: |
| user_data._completed_requests.put(error) |
| else: |
| user_data._completed_requests.put(result) |
|
|
| def prepare_tensor(name: str, data: np.ndarray): |
| server_input = grpcclient.InferInput(name=name, shape=data.shape, |
| datatype=np_to_server_dtype(data.dtype)) |
| server_input.set_data_from_numpy(data) |
| return server_input |
|
|
|
|
| def process_and_send_request(sample_request): |
| prompt = sample_request['prompt'] |
| negative_prompt = sample_request['negative_prompt'] if 'negative_prompt' in sample_request else None |
| height = sample_request['height'] if 'height' in sample_request else None |
| width = sample_request['width'] if 'width' in sample_request else None |
| num_images_per_prompt = sample_request['num_images_per_prompt'] if 'num_images_per_prompt' in sample_request else 1 |
| num_inference_steps = sample_request['num_inference_steps'] if 'num_inference_steps' in sample_request else 20 |
| image = sample_request['image'] if 'image' in sample_request else None |
| mask_image = sample_request['mask_image'] if 'mask_image' in sample_request else None |
| control_images = sample_request['control_images'] if 'control_images' in sample_request else None |
| control_weightages = sample_request['control_weightages'] if 'control_weightages' in sample_request else None |
| control_modes = sample_request['control_modes'] if 'control_modes' in sample_request else None |
| seed = sample_request['seed'] if 'seed' in sample_request else -1 |
| guidance_scale = sample_request['guidance_scale'] if 'guidance_scale' in sample_request else 7.5 |
| strength = sample_request['strength'] if 'strength' in sample_request else 1 |
| scheduler = sample_request['scheduler'] if 'scheduler' in sample_request else "EULER-A" |
| model_type = sample_request['model_type'] if 'model_type' in sample_request else None |
| lora_weights = sample_request['lora_weights'] if 'lora_weights' in sample_request else None |
| control_guidance_start = sample_request['control_guidance_start'] if 'control_guidance_start' in sample_request else None |
| control_guidance_end = sample_request['control_guidance_end'] if 'control_guidance_end' in sample_request else None |
| |
| inputs = [] |
| inputs.append(prepare_tensor("prompt", np.array([prompt], dtype=np.object_))) |
| |
| if negative_prompt is not None: |
| inputs.append(prepare_tensor("negative_prompt", np.array([negative_prompt], dtype=np.object_))) |
| |
| if height is not None: |
| inputs.append(prepare_tensor("height", np.array([height], dtype=np.int32))) |
| |
| if width is not None: |
| inputs.append(prepare_tensor("width", np.array([width], dtype=np.int32))) |
| |
| if num_images_per_prompt is not None: |
| inputs.append(prepare_tensor("num_images_per_prompt", np.array([num_images_per_prompt], dtype=np.int32))) |
| |
| if num_inference_steps is not None: |
| inputs.append(prepare_tensor("num_inference_steps", np.array([num_inference_steps], dtype=np.int32))) |
| |
| if image is not None: |
| inputs.append(prepare_tensor("image", np.array([image], dtype=np.object_))) |
| |
| if mask_image is not None: |
| inputs.append(prepare_tensor("mask_image", np.array([mask_image], dtype=np.object_))) |
| |
| if seed is not None: |
| inputs.append(prepare_tensor("seed", np.array([seed], dtype=np.int64))) |
| |
| if guidance_scale is not None: |
| inputs.append(prepare_tensor("guidance_scale", np.array([guidance_scale], dtype=np.float32))) |
| |
| if model_type is not None: |
| inputs.append(prepare_tensor("model_type", np.array([model_type], dtype=np.object_))) |
| |
| if strength is not None: |
| inputs.append(prepare_tensor("strength", np.array([strength], dtype=np.float32))) |
| |
| if scheduler is not None: |
| inputs.append(prepare_tensor("scheduler", np.array([scheduler], dtype=np.object_))) |
| |
| if control_images is not None: |
| inputs.append(prepare_tensor("control_images", np.array([control_images], dtype=np.object_))) |
| |
| if control_weightages is not None: |
| inputs.append(prepare_tensor("control_weightages", np.array([control_weightages], dtype=np.float32))) |
| |
| if control_modes is not None: |
| inputs.append(prepare_tensor("control_modes", np.array([control_modes], dtype=np.int32))) |
|
|
| if lora_weights is not None: |
| inputs.append(prepare_tensor("lora_weights", np.array([lora_weights], dtype=np.object_))) |
| |
| if control_guidance_start is not None: |
| inputs.append(prepare_tensor("control_guidance_start", np.array([control_guidance_start], dtype=np.float32))) |
| |
| if control_guidance_end is not None: |
| inputs.append(prepare_tensor("control_guidance_end", np.array([control_guidance_end], dtype=np.float32))) |
| |
| outputs = [ |
| grpcclient.InferRequestedOutput("response_id"), |
| grpcclient.InferRequestedOutput("time_taken"), |
| grpcclient.InferRequestedOutput("load_lora"), |
| grpcclient.InferRequestedOutput("output_image_urls"), |
| grpcclient.InferRequestedOutput("error"), |
| |
| ] |
| user_data = UserData() |
| st = time.time() |
| mega_pixel = 0 |
| |
| url = "localhost:8002" |
| with grpcclient.InferenceServerClient(url=url, ssl=False) as triton_client: |
| triton_client.start_stream(callback=partial(callback, user_data)) |
| |
| triton_client.async_stream_infer( |
| model_name="flux", |
| inputs=inputs, |
| outputs=outputs, |
| ) |
| et = time.time() |
| response = user_data._completed_requests.get() |
| print(response) |
| |
| |
| if hasattr(response, 'message'): |
| |
| print(f"Server error: {response}") |
| output_image_urls = [] |
| inference_time = 0 |
| lora_time = 0 |
| response_id = None |
| mega_pixel = 0 |
| error = str(response) |
| sCode = 500 |
| else: |
| |
| try: |
| inference_time = 0 |
| lora_time = 0 |
| response_id = None |
| inference_time = response.as_numpy("time_taken").item() |
| lora_time = response.as_numpy("load_lora").item() |
| response_id = response.as_numpy("response_id").item().decode() if response.as_numpy("response_id").item() else None |
| output_image_urls = response.as_numpy("output_image_urls").tolist() if response.as_numpy("output_image_urls") is not None else [] |
| mega_pixel = response.as_numpy("mega_pixel").item().decode() if response.as_numpy("mega_pixel") is not None else "0" |
| error_tensor = response.as_numpy("error") |
| error = error_tensor.item().decode() if error_tensor is not None and error_tensor.item() else None |
| sCode = 200 |
| except Exception as e: |
| print(f"Error processing response: {e}") |
| output_image_urls = [] |
| inference_time = 0 |
| lora_time = 0 |
| response_id = None |
| mega_pixel = 0 |
| error = str(e) |
| sCode = 500 |
| |
| results = { |
| "response_id": response_id, |
| "total_time_taken": et-st, |
| "inference_time_taken": inference_time, |
| "loading_lora_time": lora_time, |
| "output_image_urls": output_image_urls, |
| "error": error, |
| "mega_pixel": 0 if mega_pixel is None else mega_pixel |
| } |
|
|
| print(results) |
| |
| if output_image_urls == []: |
| print("No images generated") |
| results["error"] = "No images generated" |
| return results |
|
|
| def warmup_and_load_lora(warmup_json_path): |
| if warmup_json_path is None: |
| return False |
| with open(warmup_json_path, 'r') as f: |
| warmup_data = json.load(f) |
| st = time.time() |
| for request in warmup_data: |
| process_and_send_request(request) |
| resp_time = time.time()-st |
| print(f"Warmup and load lora done in {resp_time:.3f} seconds") |
| return True |
|
|
| def generate_jitter_window(): |
| percent_bifer = random.randint(1,100) |
| if percent_bifer >= 1 and percent_bifer <= 50: |
| jitter_window = [1, 5] |
| elif percent_bifer >= 51 and percent_bifer <= 75: |
| jitter_window = [10, 15] |
| else: |
| jitter_window = [20,30] |
| time.sleep(random.randint(jitter_window[0],jitter_window[1])) |
| return True |
|
|
|
|
| def predict(requests_data,percent_bifer): |
|
|
| random_request = random.choice(requests_data) |
| sample_request = random_request['payload'] |
| generate_jitter_window() |
| return process_and_send_request(sample_request) |
|
|
| def run_single_test(requests_data,id = 0): |
| return predict(requests_data,id) |
|
|
| number_of_users = 1 |
| duration_minutes = 2 |
|
|
| def run_concurrent_tests_cont(number_of_users, duration_minutes): |
| start_time = time.time() |
| end_time = start_time + duration_minutes * 60 |
| |
| results = [] |
| |
| with ThreadPoolExecutor(max_workers=number_of_users) as executor: |
| future_to_start_time = {} |
| |
| while time.time() < end_time: |
| |
| percent_bifer = random.randint(1,10) |
| if len(future_to_start_time) < number_of_users: |
| future = executor.submit(run_single_test, requests_data) |
| future_to_start_time[future] = time.time() |
| |
| |
| done_futures = [f for f in future_to_start_time if f.done()] |
| for future in done_futures: |
| response_time = future.result() |
| results.append(response_time) |
| del future_to_start_time[future] |
|
|
| |
| for future in future_to_start_time: |
| results.append(future.result()) |
|
|
| |
| p25 = np.percentile(results, 25) |
| p50 = np.percentile(results, 50) |
| p90 = np.percentile(results, 90) |
| p99 = np.percentile(results, 99) |
| avg = sum(results) / len(results) |
|
|
| with open(f"result_dump_{number_of_users}_{duration_minutes}.json", "w") as f: |
| f.write( |
| json.dumps(resp_list, indent=4) |
| ) |
|
|
| return p25, p50, p90, p99, avg |
|
|
| if run_multiple_tests: |
| p25_result , p50_results, p90_resutls, p99_results, avg = run_concurrent_tests_cont(number_of_users,duration_minutes) |
| load_lora_time = warmup_and_load_lora(requests_data) |
|
|
| print(f"25th Percentile: {p25_result:.3f} seconds") |
| print(f"50th Percentile: {p50_results:.3f} seconds") |
| print(f"90th Percentile: {p90_resutls:.3f} seconds") |
| print(f"99th Percentile: {p99_results:.3f} seconds") |
| print(f"Average Response Time: {avg:.3f} seconds") |
|
|
| with open(f"test_results.json", "w") as f: |
| f.write( |
| json.dumps({ |
| "p25": p25_result, |
| "p50": p50_results, |
| "p90": p90_resutls, |
| "p99": p99_results, |
| "avg": avg, |
| "sCode_breakup": scode_breakup |
| }, indent=4) |
| ) |
| else: |
| payload = { |
| "prompt": "A girl in city, 25 years old, cool, futuristic <lora:https://huggingface.co/XLabs-AI/flux-lora-collection/resolve/main/art_lora.safetensors:0.5>", |
| "negative_prompt": "blurry, low quality, distorted", |
| "height": 1024, |
| "width": 1024, |
| "num_images_per_prompt": 1, |
| "num_inference_steps": 20, |
| "seed": 42424243, |
| "guidance_scale": 7.0, |
| "model_type": "txt2img" |
| } |
| result = process_and_send_request(payload) |
| print(result) |