Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| from math import isnan | |
| import math | |
| from typing import List | |
| def calculate_gini(counts, *, min_posts=None, normalize=False): | |
| """ | |
| Compute 1 - sum(p_i^2) where p_i are category probabilities (Gini Impurity). | |
| Handles: list/tuple of counts, dict {cat: count}, numpy array, pandas Series. | |
| Edge cases: | |
| - total == 0 -> return float('nan') | |
| - total == 1 -> return 0.0 | |
| - min_posts set and total < min_posts -> return float('nan') | |
| - normalize=True -> divide by (1 - 1/k_nonzero) when k_nonzero > 1 | |
| Parameters | |
| ---------- | |
| counts : Iterable[int] | dict | pandas.Series | numpy.ndarray | |
| Nonnegative counts per category. | |
| min_posts : int | None | |
| If provided and total posts < min_posts, returns NaN. | |
| normalize : bool | |
| If True, returns Gini / (1 - 1/k_nonzero) for k_nonzero > 1. | |
| Returns | |
| ------- | |
| float | |
| """ | |
| # Convert to a flat list of counts | |
| if counts is None: | |
| return float('nan') | |
| if isinstance(counts, dict): | |
| vals = list(counts.values()) | |
| else: | |
| # Works for list/tuple/np.array/Series | |
| try: | |
| vals = list(counts) | |
| except TypeError: | |
| return float('nan') | |
| # Validate & clean | |
| vals = [float(v) for v in vals if v is not None and not math.isnan(v)] | |
| if any(v < 0 for v in vals): | |
| raise ValueError("Counts must be nonnegative.") | |
| total = sum(vals) | |
| # Edge cases | |
| if total == 0: | |
| return float('nan') | |
| if min_posts is not None and total < min_posts: | |
| return float('nan') | |
| if total == 1: | |
| base = 0.0 | |
| else: | |
| # Compute 1 - sum p_i^2 | |
| s2 = sum((v / total) ** 2 for v in vals) | |
| base = 1.0 - s2 | |
| if not normalize: | |
| return base | |
| # Normalization by maximum possible diversity for observed nonzero categories | |
| k_nonzero = sum(1 for v in vals if v > 0) | |
| if k_nonzero <= 1: | |
| # If only one category has posts, diversity is 0 and normalization isn't defined—return 0 | |
| return 0.0 | |
| denom = 1.0 - 1.0 / k_nonzero | |
| # Guard against floating tiny negatives due to FP | |
| return max(0.0, min(1.0, base / denom)) | |
| def calculate_gini_per_user(df: pd.DataFrame, all_topics: List[int]): | |
| """ | |
| Calculates the Gini Impurity for topic distribution per user. | |
| A high value indicates high topic diversity. | |
| Optimized with groupby for better performance. | |
| """ | |
| def compute_user_gini(group): | |
| existing_topic_counts = group["topic_id"].value_counts() | |
| full_topic_counts = pd.Series(0, index=all_topics) | |
| full_topic_counts.update(existing_topic_counts) | |
| return calculate_gini(full_topic_counts.values, normalize=True) | |
| # Use groupby instead of loop for O(n) instead of O(n*m) complexity | |
| user_gini = df.groupby("user_id").apply(compute_user_gini).reset_index() | |
| user_gini.columns = ["user_id", "gini_coefficient"] | |
| return user_gini.fillna(0) | |
| def calculate_gini_per_topic(df: pd.DataFrame, all_users: List[str]): | |
| """ | |
| Calculates the Gini Impurity for user distribution per topic. | |
| A high value indicates the topic is discussed by a diverse set of users. | |
| Optimized with groupby for better performance. | |
| """ | |
| def compute_topic_gini(group): | |
| existing_user_counts = group["user_id"].value_counts() | |
| full_user_counts = pd.Series(0, index=all_users) | |
| full_user_counts.update(existing_user_counts) | |
| return calculate_gini(full_user_counts.values, normalize=True) | |
| # Use groupby instead of loop for O(n) instead of O(n*m) complexity | |
| topic_gini = df.groupby("topic_id").apply(compute_topic_gini).reset_index() | |
| topic_gini.columns = ["topic_id", "gini_coefficient"] | |
| return topic_gini.fillna(0) |