| import numpy as np
|
| import pandas as pd
|
| from datetime import datetime, time
|
|
|
| class MarketProfile:
|
| def __init__(self, multiplier=2.0):
|
| self.multiplier = multiplier
|
| self.counts = {}
|
| self.total_ticks = 0
|
| self.min_price = float('inf')
|
| self.max_price = float('-inf')
|
|
|
| def reset(self):
|
| self.counts = {}
|
| self.total_ticks = 0
|
| self.min_price = float('inf')
|
| self.max_price = float('-inf')
|
|
|
| def fill_gaps(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray):
|
| """
|
| Vectorised gap-fill with dynamic step sizes.
|
| step_sizes: array of shape (N,) corresponding to each price point.
|
| We use step_sizes[:-1] for the gaps starting at prices[:-1].
|
| Returns: (filled_prices, filled_timestamps_ns)
|
| """
|
| if len(prices) < 2:
|
| return prices, timestamps_ns
|
|
|
|
|
|
|
| if np.isscalar(step_sizes):
|
|
|
| steps_interval = np.full(len(prices)-1, step_sizes, dtype=np.float64)
|
| else:
|
|
|
| steps_interval = step_sizes[:-1]
|
|
|
|
|
| steps_interval = np.where(steps_interval < 0.000001, 0.01, steps_interval)
|
|
|
| diff = np.diff(prices)
|
|
|
| diff_units = np.round(diff / steps_interval).astype(np.int64)
|
| counts = np.abs(diff_units)
|
|
|
|
|
| counts = np.append(counts, 1)
|
|
|
| total = int(np.sum(counts))
|
| if total == 0:
|
| return prices, timestamps_ns
|
|
|
| indices = np.repeat(np.arange(len(prices)), counts)
|
|
|
|
|
| cum = np.cumsum(counts)
|
| starts = np.empty_like(cum)
|
| starts[0] = 0
|
| starts[1:] = cum[:-1]
|
| offsets = np.arange(total) - np.repeat(starts, counts)
|
|
|
|
|
| directions = np.zeros(len(prices), dtype=np.float64)
|
| directions[:-1] = np.sign(diff_units)
|
|
|
|
|
|
|
| dt = np.zeros(len(prices), dtype=np.float64)
|
| dt[:-1] = np.diff(timestamps_ns).astype(np.float64)
|
|
|
|
|
| div_counts = np.where(counts > 0, counts, 1)
|
| time_steps = dt / div_counts
|
|
|
|
|
| if np.isscalar(step_sizes):
|
| expanded_steps = np.full(len(indices), step_sizes, dtype=np.float64)
|
| else:
|
| expanded_steps = step_sizes[indices]
|
|
|
| expanded_time_steps = time_steps[indices]
|
|
|
|
|
| filled_prices = prices[indices] + offsets * directions[indices] * expanded_steps
|
| filled_ts = timestamps_ns[indices].astype(np.float64) + offsets * expanded_time_steps
|
|
|
| return np.round(filled_prices, 2), filled_ts.astype(np.int64)
|
|
|
| def update(self, ticks_df: pd.DataFrame):
|
| """
|
| Updates the profile with new ticks.
|
| ticks_df must have 'bid', 'ask', 'datetime'.
|
| """
|
| if ticks_df.empty:
|
| return
|
|
|
| timestamps_ns = ticks_df['datetime'].values.astype('datetime64[ns]').astype(np.int64)
|
| bids = ticks_df['bid'].values.astype(np.float64)
|
|
|
|
|
|
|
|
|
|
|
|
|
| if 'ask' in ticks_df.columns:
|
| asks = ticks_df['ask'].values.astype(np.float64)
|
| spreads = asks - bids
|
|
|
| spreads = np.maximum(spreads, 0.00001)
|
| step_sizes = spreads * self.multiplier
|
|
|
|
|
| self.add_data(bids, timestamps_ns, step_sizes)
|
|
|
| self.add_data(asks, timestamps_ns, step_sizes)
|
|
|
| else:
|
|
|
| step_sizes = np.full(len(bids), 0.01 * self.multiplier)
|
| self.add_data(bids, timestamps_ns, step_sizes)
|
|
|
| def add_data(self, prices: np.ndarray, timestamps_ns: np.ndarray, step_sizes: np.ndarray):
|
| """
|
| Gap-fills the data and updates the histogram counts.
|
| """
|
| filled_prices, filled_ts = self.fill_gaps(prices, timestamps_ns, step_sizes)
|
|
|
|
|
| unique, counts = np.unique(filled_prices, return_counts=True)
|
|
|
| for p, c in zip(unique, counts):
|
| p = round(float(p), 2)
|
| self.counts[p] = self.counts.get(p, 0) + c
|
| self.total_ticks += c
|
| if p < self.min_price: self.min_price = p
|
| if p > self.max_price: self.max_price = p
|
|
|
| def get_vah_val_poc(self):
|
| """
|
| Calculates Value Area High (VAH), Value Area Low (VAL), and Point of Control (POC).
|
| Standard definition: 70% of volume around POC.
|
| """
|
| if not self.counts:
|
| return None, None, None
|
|
|
|
|
| sorted_prices = sorted(self.counts.keys())
|
| counts_list = [self.counts[p] for p in sorted_prices]
|
|
|
| counts_array = np.array(counts_list, dtype=np.int64)
|
| prices_array = np.array(sorted_prices, dtype=np.float64)
|
|
|
|
|
| poc_idx = np.argmax(counts_array)
|
| poc_price = prices_array[poc_idx]
|
|
|
|
|
| total_count = np.sum(counts_array)
|
| target_count = total_count * 0.70
|
|
|
| current_count = counts_array[poc_idx]
|
| left_idx = poc_idx
|
| right_idx = poc_idx
|
|
|
|
|
| while current_count < target_count:
|
|
|
| can_go_left = left_idx > 0
|
| can_go_right = right_idx < len(counts_array) - 1
|
|
|
| if not can_go_left and not can_go_right:
|
| break
|
|
|
| count_left = counts_array[left_idx - 1] if can_go_left else -1
|
| count_right = counts_array[right_idx + 1] if can_go_right else -1
|
|
|
| if count_left > count_right:
|
| current_count += count_left
|
| left_idx -= 1
|
| elif count_right > count_left:
|
| current_count += count_right
|
| right_idx += 1
|
| else:
|
|
|
| if can_go_left:
|
| current_count += count_left
|
| left_idx -= 1
|
| if can_go_right:
|
| current_count += count_right
|
| right_idx += 1
|
|
|
| val_price = prices_array[left_idx]
|
| vah_price = prices_array[right_idx]
|
|
|
| return vah_price, val_price, poc_price
|
|
|