BoobyBoobs commited on
Commit
955066e
·
verified ·
1 Parent(s): c03ae6d

Create helpers.py

Browse files
Files changed (1) hide show
  1. helpers.py +39 -0
helpers.py ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ helpers.py - utility helpers with async support and timing context
3
+ """
4
+
5
+ import time
6
+ import asyncio
7
+
8
+ def async_run_if_possible(func):
9
+ """Run async if event loop exists, else sync."""
10
+ def wrapper(*args, **kwargs):
11
+ try:
12
+ loop = asyncio.get_event_loop()
13
+ if loop.is_running():
14
+ return asyncio.create_task(func(*args, **kwargs))
15
+ else:
16
+ return func(*args, **kwargs)
17
+ except RuntimeError:
18
+ return func(*args, **kwargs)
19
+ return wrapper
20
+
21
+ class calculate_duration:
22
+ """Context manager for timing code blocks."""
23
+ def __init__(self, activity_name=""):
24
+ self.activity_name = activity_name
25
+
26
+ def __enter__(self):
27
+ self.start_time = time.time()
28
+ return self
29
+
30
+ def __exit__(self, exc_type, exc_val, exc_tb):
31
+ duration = time.time() - self.start_time
32
+ if self.activity_name:
33
+ print(f"[Duration] {self.activity_name}: {duration:.3f} seconds")
34
+ else:
35
+ print(f"[Duration]: {duration:.3f} seconds")
36
+
37
+ def flux_pipe_call_that_returns_an_iterable_of_images(self, *args, **kwargs):
38
+ # Placeholder, must be replaced by your actual flux_pipe method
39
+ raise NotImplementedError("Must implement 'flux_pipe_call_that_returns_an_iterable_of_images'")