diff --git "a/monty_api_tool_v2.py" "b/monty_api_tool_v2.py" new file mode 100644--- /dev/null +++ "b/monty_api_tool_v2.py" @@ -0,0 +1,3973 @@ +#!/usr/bin/env python3 +"""Monty-backed API orchestration executor (v2). + +v2 goals: +- HfApi-first helper implementations for endpoints covered by huggingface_hub. +- Thin raw API fallback for uncovered endpoints (/api/recent-activity, /api/trending, + /api/users//likes event stream, collections q-search). +- Stable machine-first helper envelopes (`items` + optional `item`, no polymorphic payloads). +""" + +from __future__ import annotations + +import argparse +import asyncio +import ast +import inspect +import json +import os +import re +import time +from itertools import islice +from typing import Any, Callable, cast, get_args +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from huggingface_hub import HfApi +from huggingface_hub.hf_api import DatasetSort_T, ModelSort_T, SpaceSort_T + +# Runtime-level execution limits. +# - max_calls: hard cap on the total number of external helper/API calls a single +# generated program may make in one run. +# - timeout_sec: wall-clock timeout for the full Monty execution. +DEFAULT_TIMEOUT_SEC = 90 # Default end-to-end timeout for one Monty run. +DEFAULT_MAX_CALLS = 400 # Default external-call budget exposed to callers. +MAX_CALLS_LIMIT = 400 # Absolute max external-call budget accepted by the runtime. +INTERNAL_STRICT_MODE = False + +# Result-size vocabulary used throughout helper metadata: +# - return_limit: how many rows the caller wants back from a helper. +# - scan_limit / max_pages: how much source data a helper is willing to inspect +# internally to answer the query. +# - hard cap: an absolute runtime-imposed maximum on rows returned in one helper call. +OUTPUT_ITEMS_TRUNCATION_LIMIT = 500 # Final output truncation for oversized `items` payloads. +EXHAUSTIVE_HELPER_RETURN_HARD_CAP = 2_000 # Runtime hard cap for exhaustive-helper output rows. +SELECTIVE_ENDPOINT_RETURN_HARD_CAP = 200 # Default cap for one-shot selective endpoint helpers. +TRENDING_ENDPOINT_MAX_LIMIT = 20 # Upstream `/api/trending` endpoint maximum. + +# Exhaustive helper scan/page ceilings. These bound how much upstream data we +# inspect, which is different from how many rows we return to the caller. +GRAPH_SCAN_LIMIT_CAP = 10_000 # Max follower/member rows scanned in one helper call. +LIKES_SCAN_LIMIT_CAP = 10_000 # Max like-event rows scanned in one helper call. +LIKES_RANKING_WINDOW_DEFAULT = 40 # Default shortlist size when ranking likes by repo popularity. +LIKES_ENRICHMENT_MAX_REPOS = 50 # Max liked repos enriched with extra repo-detail calls. +RECENT_ACTIVITY_PAGE_SIZE = 100 # Rows requested per `/api/recent-activity` page. +RECENT_ACTIVITY_SCAN_MAX_PAGES = 10 # Max recent-activity pages fetched in one helper call. + +# Compact summary helpers intentionally inspect less data than the full +# exhaustive helpers so they remain fast and predictable. +USER_SUMMARY_GRAPH_SCAN_LIMIT = 1_000 # Follower/following rows sampled for user summary. +USER_SUMMARY_LIKES_SCAN_LIMIT = 1_000 # Like rows sampled for user summary. +USER_SUMMARY_ACTIVITY_MAX_PAGES = 3 # Activity pages sampled for user summary. + +# Monty sandbox resource limits. These constrain the Python execution +# environment itself rather than Hub/API pagination behavior. +DEFAULT_MONTY_MAX_MEMORY = 64 * 1024 * 1024 # 64 MiB +DEFAULT_MONTY_MAX_ALLOCATIONS = 250_000 # Approximate object-allocation ceiling in the sandbox. +DEFAULT_MONTY_MAX_RECURSION_DEPTH = 100 # Python recursion limit inside the sandbox. + +_MODEL_SORT_KEYS = set(get_args(ModelSort_T)) or { + "created_at", + "downloads", + "last_modified", + "likes", + "trending_score", +} +_DATASET_SORT_KEYS = set(get_args(DatasetSort_T)) or { + "created_at", + "downloads", + "last_modified", + "likes", + "trending_score", +} +_SPACE_SORT_KEYS = set(get_args(SpaceSort_T)) or { + "created_at", + "last_modified", + "likes", + "trending_score", +} + +_REPO_SORT_KEYS: dict[str, set[str]] = { + "model": _MODEL_SORT_KEYS, + "dataset": _DATASET_SORT_KEYS, + "space": _SPACE_SORT_KEYS, +} + +_SORT_KEY_ALIASES: dict[str, str] = { + "createdat": "created_at", + "created_at": "created_at", + "created-at": "created_at", + "downloads": "downloads", + "likes": "likes", + "lastmodified": "last_modified", + "last_modified": "last_modified", + "last-modified": "last_modified", + "trendingscore": "trending_score", + "trending_score": "trending_score", + "trending-score": "trending_score", + "trending": "trending_score", +} + +_USER_FIELD_ALIASES: dict[str, str] = { + "login": "username", + "user": "username", + "handle": "username", + "name": "fullname", + "full_name": "fullname", + "full-name": "fullname", + "is_pro": "isPro", + "ispro": "isPro", + "pro": "isPro", +} + +_ACTOR_FIELD_ALIASES: dict[str, str] = { + **_USER_FIELD_ALIASES, + "entity_type": "type", + "user_type": "type", + "actor_type": "type", +} + +# Repo helpers prefer canonical snake_case field names in generated code, but +# tolerate common camelCase/raw endpoint aliases when callers project with +# `fields=[...]`. +_REPO_FIELD_ALIASES: dict[str, str] = { + "repoid": "repo_id", + "repotype": "repo_type", + "repourl": "repo_url", + "createdat": "created_at", + "lastmodified": "last_modified", + "pipelinetag": "pipeline_tag", + "trendingscore": "trending_score", + "libraryname": "library_name", + "paperswithcodeid": "paperswithcode_id", +} + +_COLLECTION_FIELD_ALIASES: dict[str, str] = { + "collectionid": "collection_id", + "lastupdated": "last_updated", + "ownertype": "owner_type", + "itemcount": "item_count", + "author": "owner", +} + +REPO_CANONICAL_FIELDS: tuple[str, ...] = ( + "repo_id", + "repo_type", + "title", + "author", + "likes", + "downloads", + "created_at", + "last_modified", + "pipeline_tag", + "repo_url", + "tags", + "library_name", + "description", + "paperswithcode_id", + "sdk", + "models", + "datasets", + "subdomain", +) + +USER_CANONICAL_FIELDS: tuple[str, ...] = ( + "username", + "fullname", + "bio", + "websiteUrl", + "twitter", + "github", + "linkedin", + "bluesky", + "followers", + "following", + "likes", + "isPro", +) + +PROFILE_CANONICAL_FIELDS: tuple[str, ...] = ( + "handle", + "entity_type", + "display_name", + "bio", + "description", + "avatar_url", + "website_url", + "twitter_url", + "github_url", + "linkedin_url", + "bluesky_url", + "followers_count", + "following_count", + "likes_count", + "members_count", + "models_count", + "datasets_count", + "spaces_count", + "discussions_count", + "papers_count", + "upvotes_count", + "organizations", + "is_pro", + "likes_sample", + "activity_sample", +) + +ACTOR_CANONICAL_FIELDS: tuple[str, ...] = ( + "username", + "fullname", + "isPro", + "role", + "type", +) + +ACTIVITY_CANONICAL_FIELDS: tuple[str, ...] = ( + "event_type", + "repo_id", + "repo_type", + "timestamp", +) + +COLLECTION_CANONICAL_FIELDS: tuple[str, ...] = ( + "collection_id", + "slug", + "title", + "owner", + "owner_type", + "description", + "last_updated", + "item_count", +) + +# Extra hf_repo_search kwargs intentionally supported as pass-through to +# huggingface_hub.HfApi.list_models/list_datasets/list_spaces. +# (Generic args like `query/search/sort/author/limit` are handled directly in +# hf_repo_search signature and are not listed here.) +_REPO_SEARCH_EXTRA_ARGS: dict[str, set[str]] = { + "model": { + "filter", + "apps", + "gated", + "inference", + "inference_provider", + "model_name", + "trained_dataset", + "pipeline_tag", + "emissions_thresholds", + "expand", + "full", + "cardData", + "card_data", # alias + "fetch_config", + }, + "dataset": { + "filter", + "benchmark", + "dataset_name", + "gated", + "language_creators", + "language", + "multilinguality", + "size_categories", + "task_categories", + "task_ids", + "expand", + "full", + }, + "space": { + "filter", + "datasets", + "models", + "linked", + "expand", + "full", + }, +} + +# Rich default metadata for repo search. These raw endpoint expand keys are +# normalized into the stable repo-row field surface below; keep them aligned +# with `_build_repo_row(...)`, `_REPO_FIELD_ALIASES`, and the shared agent docs. +_REPO_SEARCH_DEFAULT_EXPAND: dict[str, list[str]] = { + "model": [ + "author", + "createdAt", + "downloads", + "gated", + "lastModified", + "library_name", + "likes", + "pipeline_tag", + "private", + "sha", + "tags", + "trendingScore", + ], + "dataset": [ + "author", + "createdAt", + "description", + "downloads", + "gated", + "lastModified", + "likes", + "paperswithcode_id", + "private", + "sha", + "tags", + "trendingScore", + ], + "space": [ + "author", + "createdAt", + "datasets", + "lastModified", + "likes", + "models", + "private", + "sdk", + "sha", + "subdomain", + "tags", + "trendingScore", + ], +} + +# Per-helper pagination defaults and ceilings. +# These values answer questions like: +# - "If the caller omits return_limit, how many rows should this helper return?" +# - "How much upstream data may this helper scan/page through internally?" +# - "What is the helper-specific max_return override, if any?" +PAGINATION_POLICY: dict[str, dict[str, Any]] = { + "hf_user_graph": { + "scan_max": GRAPH_SCAN_LIMIT_CAP, + "default_return": 1_000, + "max_return": GRAPH_SCAN_LIMIT_CAP, + }, + "hf_org_members": {"scan_max": GRAPH_SCAN_LIMIT_CAP, "default_return": 1_000}, + "hf_repo_likers": {"default_return": 1_000}, + "hf_user_likes": { + "scan_max": LIKES_SCAN_LIMIT_CAP, + "default_return": 100, + "ranking_default": LIKES_RANKING_WINDOW_DEFAULT, + "enrich_max": LIKES_ENRICHMENT_MAX_REPOS, + }, + "hf_recent_activity": { + "page_limit": RECENT_ACTIVITY_PAGE_SIZE, + "max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES, + "default_return": 100, + }, + "hf_repo_search": {"max_return": 5_000, "default_return": 20}, + "hf_trending": {"max_return": TRENDING_ENDPOINT_MAX_LIMIT, "default_return": 20}, + "hf_collections_search": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 20}, + "hf_collection_items": {"max_return": OUTPUT_ITEMS_TRUNCATION_LIMIT, "default_return": 100}, +} + +# Single source of truth for the public helper surface exposed to generated +# Monty code. Keep runtime helper resolution derived from this tuple. +HELPER_EXTERNALS = ( + "hf_runtime_capabilities", + "hf_whoami", + "hf_profile_summary", + "hf_org_members", + "hf_repo_search", + "hf_user_graph", + "hf_repo_likers", + "hf_user_likes", + "hf_recent_activity", + "hf_repo_discussions", + "hf_repo_discussion_details", + "hf_repo_details", + "hf_trending", + "hf_collections_search", + "hf_collection_items", +) + +HELPER_COVERED_ENDPOINT_PATTERNS: list[tuple[str, str]] = [ + (r"^/api/whoami-v2$", "hf_whoami"), + (r"^/api/trending$", "hf_trending"), + (r"^/api/recent-activity$", "hf_recent_activity"), + (r"^/api/models$", "hf_repo_search"), + (r"^/api/datasets$", "hf_repo_search"), + (r"^/api/spaces$", "hf_repo_search"), + (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$", "hf_repo_details"), + (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", "hf_repo_discussions"), + (r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", "hf_repo_discussion_details"), + (r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", "hf_repo_likers"), + (r"^/api/users/[^/]+/overview$", "hf_profile_summary"), + (r"^/api/organizations/[^/]+/overview$", "hf_profile_summary"), + (r"^/api/users/[^/]+/likes$", "hf_user_likes"), + (r"^/api/users/[^/]+/(followers|following)$", "hf_user_graph"), + (r"^/api/organizations/[^/]+/members$", "hf_org_members"), + (r"^/api/organizations/[^/]+/followers$", "hf_user_graph"), + (r"^/api/collections$", "hf_collections_search"), + (r"^/api/collections/[^/]+$", "hf_collection_items"), + (r"^/api/collections/[^/]+/[^/]+$", "hf_collection_items"), +] + + +def _resolve_helper_functions(namespace: dict[str, Any]) -> dict[str, Callable[..., Any]]: + resolved: dict[str, Callable[..., Any]] = {} + for helper_name in HELPER_EXTERNALS: + candidate = namespace.get(helper_name) + if not callable(candidate): + raise RuntimeError(f"Helper '{helper_name}' is not defined or not callable") + resolved[helper_name] = cast(Callable[..., Any], candidate) + return resolved + +ALLOWLIST_PATTERNS = [ + r"^/api/whoami-v2$", + r"^/api/trending$", + r"^/api/daily_papers$", + r"^/api/models$", + r"^/api/datasets$", + r"^/api/spaces$", + r"^/api/models-tags-by-type$", + r"^/api/datasets-tags-by-type$", + r"^/api/(models|datasets|spaces)/[^/]+/[^/]+$", + r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", + r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", + r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$", + r"^/api/users/[^/]+/overview$", + r"^/api/users/[^/]+/socials$", + r"^/api/users/[^/]+/followers$", + r"^/api/users/[^/]+/following$", + r"^/api/users/[^/]+/likes$", + r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", + r"^/api/organizations/[^/]+/overview$", + r"^/api/organizations/[^/]+/members$", + r"^/api/organizations/[^/]+/followers$", + r"^/api/collections$", + r"^/api/collections/[^/]+$", + r"^/api/collections/[^/]+/[^/]+$", + r"^/api/recent-activity$", +] + +STRICT_ALLOWLIST_PATTERNS = [ + r"^/api/users/[^/]+/overview$", + r"^/api/users/[^/]+/socials$", + r"^/api/whoami-v2$", + r"^/api/trending$", + r"^/api/daily_papers$", + r"^/api/(models|datasets|spaces)/(?:[^/]+|[^/]+/[^/]+)/likers$", + r"^/api/collections$", + r"^/api/collections/[^/]+$", + r"^/api/collections/[^/]+/[^/]+$", + r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions$", + r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+$", + r"^/api/(models|datasets|spaces)/[^/]+/[^/]+/discussions/\d+/status$", +] + + +class MontyExecutionError(RuntimeError): + def __init__(self, message: str, api_calls: int, trace: list[dict[str, Any]]): + super().__init__(message) + self.api_calls = api_calls + self.trace = trace + + +def _load_request_token() -> str | None: + try: + from fast_agent.mcp.auth.context import request_bearer_token # type: ignore + + token = request_bearer_token.get() + if token: + return token + except Exception: + pass + return None + + +def _load_token() -> str | None: + token = _load_request_token() + if token: + return token + return os.getenv("HF_TOKEN") or None + + +def _normalize_endpoint(endpoint: str) -> str: + ep = (endpoint or "").strip() + if not ep: + raise ValueError("endpoint is required") + if "?" in ep: + raise ValueError("endpoint must not include query string; use params") + if ep.startswith("http://") or ep.startswith("https://"): + raise ValueError("endpoint must be path-only") + if not ep.startswith("/"): + ep = "/" + ep + if not ep.startswith("/api/"): + ep = "/api" + ep + if ep in {"/api/collections/search", "/api/collections/search/"}: + ep = "/api/collections" + if ".." in ep: + raise ValueError("path traversal not allowed") + return ep + + +def _endpoint_allowed(endpoint: str, strict_mode: bool) -> bool: + path = endpoint.split("?", 1)[0] + patterns = STRICT_ALLOWLIST_PATTERNS if strict_mode else ALLOWLIST_PATTERNS + return any(re.match(p, path) for p in patterns) + + +def _json_best_effort(raw: bytes) -> Any: + try: + return json.loads(raw) + except Exception: + return raw.decode("utf-8", errors="replace") + + +def _sanitize_params(endpoint: str, params: dict[str, Any] | None) -> dict[str, Any]: + clean = dict(params or {}) + path = endpoint.split("?", 1)[0] + + if path == "/api/collections": + if "q" not in clean and "search" in clean: + clean["q"] = clean.get("search") + clean.pop("search", None) + + if path == "/api/trending": + t = str(clean.get("type") or "").strip().lower() + aliases = {"models": "model", "datasets": "dataset", "spaces": "space"} + if t in aliases: + clean["type"] = aliases[t] + lim = clean.get("limit") + if lim is not None: + try: + n = int(lim) + except Exception: + n = TRENDING_ENDPOINT_MAX_LIMIT + clean["limit"] = max(1, min(n, TRENDING_ENDPOINT_MAX_LIMIT)) + return clean + + lim = clean.get("limit") + if lim is None: + return clean + try: + n = int(lim) + except Exception: + return clean + + endpoint_limit_max = SELECTIVE_ENDPOINT_RETURN_HARD_CAP + if re.match(r"^/api/users/[^/]+/(followers|following)$", path): + endpoint_limit_max = GRAPH_SCAN_LIMIT_CAP + elif re.match(r"^/api/users/[^/]+/likes$", path): + endpoint_limit_max = LIKES_SCAN_LIMIT_CAP + + clean["limit"] = max(1, min(n, endpoint_limit_max)) + return clean + + +def _truncate_result_payload(output: Any) -> Any: + if not isinstance(output, dict): + return output + + items = output.get("items") + if not isinstance(items, list) or len(items) <= OUTPUT_ITEMS_TRUNCATION_LIMIT: + return output + + trimmed = dict(output) + trimmed_items = items[:OUTPUT_ITEMS_TRUNCATION_LIMIT] + trimmed["items"] = trimmed_items + trimmed["item"] = trimmed_items[0] if len(trimmed_items) == 1 else None + note = f"truncated items to first {OUTPUT_ITEMS_TRUNCATION_LIMIT} rows for token efficiency" + steps = trimmed.get("steps") + if isinstance(steps, list): + trimmed["steps"] = [*steps, note] + else: + trimmed["steps"] = [note] + return trimmed + + +def _is_helper_envelope(output: Any) -> bool: + return ( + isinstance(output, dict) + and isinstance(output.get("ok"), bool) + and "items" in output + and "meta" in output + and "error" in output + ) + + +def _summarize_limit_hit(helper_name: str, result: Any) -> dict[str, Any] | None: + if not _is_helper_envelope(result): + return None + meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} + if not isinstance(meta, dict): + return None + + truncated_by = str(meta.get("truncated_by") or "") + limit_hit = any( + [ + meta.get("truncated") is True, + meta.get("hard_cap_applied") is True, + truncated_by in {"scan_limit", "page_limit", "multiple"}, + ] + ) + if not limit_hit: + return None + + summary: dict[str, Any] = { + "helper": helper_name, + "source": meta.get("source"), + "returned": meta.get("returned"), + "total": meta.get("total"), + "truncated": meta.get("truncated"), + "truncated_by": meta.get("truncated_by"), + "more_available": meta.get("more_available"), + "requested_return_limit": meta.get("requested_return_limit"), + "applied_return_limit": meta.get("applied_return_limit"), + "next_request_hint": meta.get("next_request_hint"), + } + if meta.get("scan_limit") is not None: + summary["scan_limit"] = meta.get("scan_limit") + if meta.get("applied_max_pages") is not None: + summary["applied_max_pages"] = meta.get("applied_max_pages") + return summary + + +def _wrap_raw_result( + result: Any, + *, + ok: bool, + api_calls: int, + elapsed_ms: int, + limit_summaries: list[dict[str, Any]] | None = None, + error: str | None = None, +) -> dict[str, Any]: + hits = [dict(summary) for summary in (limit_summaries or [])[:10]] + meta: dict[str, Any] = { + "ok": ok, + "api_calls": api_calls, + "elapsed_ms": elapsed_ms, + "limits_reached": bool(hits), + "limit_summary": hits, + } + if error is not None: + meta["error"] = error + return { + "result": result, + "meta": meta, + } + + +def _clamp_int(value: Any, *, default: int, minimum: int, maximum: int) -> int: + try: + out = int(value) + except Exception: + out = default + return max(minimum, min(out, maximum)) + + +def _as_int(value: Any) -> int | None: + try: + return int(value) + except Exception: + return None + + +def _canonical_repo_type(value: Any, *, default: str = "model") -> str: + raw = str(value or "").strip().lower() + aliases = { + "model": "model", + "models": "model", + "dataset": "dataset", + "datasets": "dataset", + "space": "space", + "spaces": "space", + } + return aliases.get(raw, default) + + +def _normalize_repo_sort_key(repo_type: str, sort_value: Any) -> tuple[str | None, str | None]: + raw = str(sort_value or "").strip() + if not raw: + return None, None + + key = _SORT_KEY_ALIASES.get(raw.lower().replace(" ", "").replace("__", "_")) + if key is None: + key = _SORT_KEY_ALIASES.get(raw.lower()) + if key is None: + return None, f"Invalid sort key '{raw}'" + + rt = _canonical_repo_type(repo_type) + allowed = _REPO_SORT_KEYS.get(rt, set()) + if key not in allowed: + return None, f"Invalid sort key '{raw}' for repo_type='{rt}'. Allowed: {', '.join(sorted(allowed))}" + return key, None + + +def _repo_detail_endpoint(repo_type: str, repo_id: str) -> str: + rt = _canonical_repo_type(repo_type) + rid = str(repo_id or "").strip() + if "/" not in rid: + raise ValueError("repo_id must be owner/name") + owner, name = rid.split("/", 1) + if not owner or not name: + raise ValueError("repo_id must be owner/name") + return f"/api/{rt}s/{owner}/{name}" + + +def _coerce_str_list(value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, str): + raw = [value] + elif isinstance(value, (list, tuple, set)): + raw = list(value) + else: + raise ValueError("Expected a string or list of strings") + return [str(v).strip() for v in raw if str(v).strip()] + + +def _optional_str_list(value: Any) -> list[str] | None: + if value is None: + return None + if isinstance(value, str): + out = [value.strip()] if value.strip() else [] + return out or None + if isinstance(value, (list, tuple, set)): + out = [str(v).strip() for v in value if str(v).strip()] + return out or None + return None + + +def _dt_to_str(value: Any) -> str | None: + if value is None: + return None + iso = getattr(value, "isoformat", None) + if callable(iso): + try: + return str(iso()) + except Exception: + pass + return str(value) + + +def _repo_web_url(repo_type: str, repo_id: str | None) -> str | None: + if not isinstance(repo_id, str) or not repo_id: + return None + base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") + rt = _canonical_repo_type(repo_type, default="") + if rt == "dataset": + return f"{base}/datasets/{repo_id}" + if rt == "space": + return f"{base}/spaces/{repo_id}" + return f"{base}/{repo_id}" + + +def _build_repo_row( + *, + repo_id: Any, + repo_type: str, + author: Any = None, + title: Any = None, + likes: Any = None, + downloads: Any = None, + created_at: Any = None, + last_modified: Any = None, + pipeline_tag: Any = None, + private: Any = None, + trending_score: Any = None, + tags: Any = None, + sha: Any = None, + gated: Any = None, + library_name: Any = None, + description: Any = None, + paperswithcode_id: Any = None, + sdk: Any = None, + models: Any = None, + datasets: Any = None, + subdomain: Any = None, +) -> dict[str, Any]: + rt = _canonical_repo_type(repo_type) + author_value = author + if not isinstance(author_value, str) and isinstance(repo_id, str) and "/" in repo_id: + author_value = repo_id.split("/", 1)[0] + + title_value = title + if (not isinstance(title_value, str) or not title_value.strip()) and isinstance(repo_id, str) and repo_id: + title_value = repo_id if rt == "space" else None + + return { + "id": repo_id, + "slug": repo_id, + "repo_id": repo_id, + "title": title_value, + "repo_type": rt, + "author": author_value, + "likes": _as_int(likes), + "downloads": _as_int(downloads), + "created_at": _dt_to_str(created_at), + "last_modified": _dt_to_str(last_modified), + "pipeline_tag": pipeline_tag, + "private": private, + "trending_score": _as_int(trending_score) if trending_score is not None else None, + "repo_url": _repo_web_url(rt, repo_id if isinstance(repo_id, str) else None), + "tags": _optional_str_list(tags), + "sha": sha, + "gated": gated, + "library_name": library_name, + "description": description, + "paperswithcode_id": paperswithcode_id, + "sdk": sdk, + "models": _optional_str_list(models), + "datasets": _optional_str_list(datasets), + "subdomain": subdomain, + } + + +def _normalize_repo_search_row(row: Any, repo_type: str) -> dict[str, Any]: + return _build_repo_row( + repo_id=getattr(row, "id", None), + repo_type=repo_type, + author=getattr(row, "author", None), + title=getattr(row, "title", None), + likes=getattr(row, "likes", None), + downloads=getattr(row, "downloads", None), + created_at=getattr(row, "created_at", None), + last_modified=getattr(row, "last_modified", None), + pipeline_tag=getattr(row, "pipeline_tag", None), + private=getattr(row, "private", None), + trending_score=getattr(row, "trending_score", None), + tags=getattr(row, "tags", None), + sha=getattr(row, "sha", None), + gated=getattr(row, "gated", None), + library_name=getattr(row, "library_name", None), + description=getattr(row, "description", None), + paperswithcode_id=getattr(row, "paperswithcode_id", None), + sdk=getattr(row, "sdk", None), + models=getattr(row, "models", None), + datasets=getattr(row, "datasets", None), + subdomain=getattr(row, "subdomain", None), + ) + + +def _normalize_repo_detail_row(detail: Any, repo_type: str, repo_id: str) -> dict[str, Any]: + row = _normalize_repo_search_row(detail, repo_type) + resolved_repo_id = row.get("repo_id") or repo_id + row["id"] = row.get("id") or resolved_repo_id + row["slug"] = row.get("slug") or resolved_repo_id + row["repo_id"] = resolved_repo_id + row["repo_url"] = _repo_web_url(repo_type, resolved_repo_id) + return row + + +def _normalize_trending_row(repo: dict[str, Any], default_repo_type: str, rank: int | None = None) -> dict[str, Any]: + row = _build_repo_row( + repo_id=repo.get("id"), + repo_type=repo.get("type") or default_repo_type, + author=repo.get("author"), + title=repo.get("title"), + likes=repo.get("likes"), + downloads=repo.get("downloads"), + created_at=repo.get("createdAt"), + last_modified=repo.get("lastModified"), + pipeline_tag=repo.get("pipeline_tag"), + private=repo.get("private"), + trending_score=repo.get("trendingScore"), + tags=repo.get("tags"), + sha=repo.get("sha"), + gated=repo.get("gated"), + library_name=repo.get("library_name"), + description=repo.get("description"), + paperswithcode_id=repo.get("paperswithcode_id"), + sdk=repo.get("sdk"), + models=repo.get("models"), + datasets=repo.get("datasets"), + subdomain=repo.get("subdomain"), + ) + if rank is not None: + row["trending_rank"] = rank + return row + + +def _normalize_collection_repo_item(row: dict[str, Any]) -> dict[str, Any] | None: + repo_id = row.get("id") or row.get("repoId") or row.get("repo_id") + if not isinstance(repo_id, str) or not repo_id: + return None + + repo_type = _canonical_repo_type(row.get("repoType") or row.get("repo_type") or row.get("type"), default="") + if repo_type not in {"model", "dataset", "space"}: + return None + + return _build_repo_row( + repo_id=repo_id, + repo_type=repo_type, + author=row.get("author") or _author_from_any(row.get("authorData")), + title=row.get("title"), + likes=row.get("likes"), + downloads=row.get("downloads"), + created_at=row.get("createdAt") or row.get("created_at"), + last_modified=row.get("lastModified") or row.get("last_modified"), + pipeline_tag=row.get("pipeline_tag") or row.get("pipelineTag"), + private=row.get("private"), + tags=row.get("tags"), + gated=row.get("gated"), + library_name=row.get("library_name") or row.get("libraryName"), + description=row.get("description"), + paperswithcode_id=row.get("paperswithcode_id") or row.get("paperswithcodeId"), + sdk=row.get("sdk"), + models=row.get("models"), + datasets=row.get("datasets"), + subdomain=row.get("subdomain"), + ) + + +def _sort_repo_rows(rows: list[dict[str, Any]], sort_key: str | None) -> list[dict[str, Any]]: + if not sort_key: + return rows + + if sort_key in {"likes", "downloads", "trending_score"}: + return sorted(rows, key=lambda row: _as_int(row.get(sort_key)) or -1, reverse=True) + + if sort_key in {"created_at", "last_modified"}: + return sorted(rows, key=lambda row: str(row.get(sort_key) or ""), reverse=True) + + return rows + + +def call_api_host( + endpoint: str, + *, + method: str = "GET", + params: dict[str, Any] | None = None, + json_body: dict[str, Any] | None = None, + timeout_sec: int = DEFAULT_TIMEOUT_SEC, + strict_mode: bool = False, +) -> dict[str, Any]: + method_u = method.upper().strip() + if method_u not in {"GET", "POST"}: + raise ValueError("Only GET and POST are supported") + + ep = _normalize_endpoint(endpoint) + if not _endpoint_allowed(ep, strict_mode): + raise ValueError(f"Endpoint not allowed: {ep}") + + params = _sanitize_params(ep, params) + if ep == "/api/recent-activity": + feed_type = str((params or {}).get("feedType", "")).strip().lower() + if feed_type not in {"user", "org"}: + raise ValueError("/api/recent-activity requires feedType=user|org") + if not str((params or {}).get("entity", "")).strip(): + raise ValueError("/api/recent-activity requires entity") + + base = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") + q = urlencode(params or {}, doseq=True) + url = f"{base}{ep}" + (f"?{q}" if q else "") + + headers = {"Accept": "application/json"} + token = _load_token() + if token: + headers["Authorization"] = f"Bearer {token}" + + data = None + if method_u == "POST": + headers["Content-Type"] = "application/json" + data = json.dumps(json_body or {}).encode("utf-8") + + req = Request(url, method=method_u, headers=headers, data=data) + try: + with urlopen(req, timeout=timeout_sec) as res: + payload = _json_best_effort(res.read()) + return {"ok": True, "status": int(res.status), "url": url, "data": payload, "error": None} + except HTTPError as e: + payload = _json_best_effort(e.read()) + err = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False)[:1000] + return {"ok": False, "status": int(e.code), "url": url, "data": payload, "error": err} + except URLError as e: + return {"ok": False, "status": 0, "url": url, "data": None, "error": f"Network error: {e}"} + + +def _validate_generated_code(code: str) -> None: + if not code.strip(): + raise ValueError("Generated code is empty") + + blocked_patterns: list[tuple[str, str]] = [ + (r"(?m)^\s*import\s+\S", "import statement"), + (r"(?m)^\s*from\s+\S+\s+import\s+\S", "from-import statement"), + (r"\bexec\s*\(", "exec("), + (r"\beval\s*\(", "eval("), + (r"\bopen\s*\(", "open("), + (r"\b__import__\b", "__import__"), + (r"(?i)\bwhile\s+true\b", "while true"), + ] + for pattern, label in blocked_patterns: + if re.search(pattern, code): + raise ValueError(f"Generated code contains blocked pattern: {label}") + + try: + parsed = compile( # noqa: S102 - compile is used for AST validation only. + code, + "", + "exec", + flags=ast.PyCF_ONLY_AST | ast.PyCF_ALLOW_TOP_LEVEL_AWAIT, + dont_inherit=True, + ) + except SyntaxError as e: + message = e.msg or "invalid syntax" + raise ValueError(f"Generated code is not valid Python: {message}") from e + + if not isinstance(parsed, ast.Module): + raise ValueError("Generated code must be a Python module") + + solve_defs = [ + node + for node in parsed.body + if isinstance(node, ast.AsyncFunctionDef) and node.name == "solve" + ] + if not solve_defs: + raise ValueError("Generated code must define `async def solve(query, max_calls): ...`.") + + def _valid_solve_signature(node: ast.AsyncFunctionDef) -> bool: + args = node.args + return ( + not args.posonlyargs + and len(args.args) == 2 + and [arg.arg for arg in args.args] == ["query", "max_calls"] + and args.vararg is None + and not args.kwonlyargs + and args.kwarg is None + and not args.defaults + and not args.kw_defaults + ) + + if not any(_valid_solve_signature(node) for node in solve_defs): + raise ValueError("`solve` must have signature `async def solve(query, max_calls): ...`.") + + if not parsed.body: + raise ValueError("Generated code is empty") + + final_stmt = parsed.body[-1] + valid_final_await = ( + isinstance(final_stmt, ast.Expr) + and isinstance(final_stmt.value, ast.Await) + and isinstance(final_stmt.value.value, ast.Call) + and isinstance(final_stmt.value.value.func, ast.Name) + and final_stmt.value.value.func.id == "solve" + and len(final_stmt.value.value.args) == 2 + and not final_stmt.value.value.keywords + and all(isinstance(arg, ast.Name) for arg in final_stmt.value.value.args) + and [cast(ast.Name, arg).id for arg in final_stmt.value.value.args] == ["query", "max_calls"] + ) + if not valid_final_await: + raise ValueError("Generated code must end with `await solve(query, max_calls)`.") + + def _preferred_helper_for_endpoint(endpoint: str) -> str | None: + for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS: + if re.match(pattern, endpoint): + return helper_name + return None + + def _call_api_endpoint_hint(expr: ast.AST | None) -> str | None: + if isinstance(expr, ast.Constant) and isinstance(expr.value, str): + return expr.value + if isinstance(expr, ast.JoinedStr): + literal_parts = [ + value.value + for value in expr.values + if isinstance(value, ast.Constant) and isinstance(value.value, str) + ] + if literal_parts: + return "".join(literal_parts) + return None + + for node in ast.walk(parsed): + if not isinstance(node, ast.Call): + continue + if not isinstance(node.func, ast.Name) or node.func.id != "call_api": + continue + + endpoint_expr: ast.AST | None = node.args[0] if node.args else None + for keyword in node.keywords: + if keyword.arg == "endpoint": + endpoint_expr = keyword.value + break + + endpoint_hint = _call_api_endpoint_hint(endpoint_expr) + if endpoint_hint and "/api/collections/" in endpoint_hint and "/items" in endpoint_hint: + raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.") + if endpoint_hint: + preferred_helper = _preferred_helper_for_endpoint(endpoint_hint) + if preferred_helper is not None: + raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_hint!r}, ...)` for this endpoint family.") + + allowed_external_calls = ["call_api(", *[f"{name}(" for name in HELPER_EXTERNALS]] + if not any(token in code for token in allowed_external_calls): + raise ValueError("Generated code must call at least one external API function (call_api or hf_* helper)") + + helper_name_set = set(HELPER_EXTERNALS) + for m in re.finditer(r"call_api\(\s*([\"'])\s*([^\"']+)\s*\1", code): + endpoint_literal = str(m.group(2) or "").strip() + if not endpoint_literal: + continue + if ( + endpoint_literal in helper_name_set + or endpoint_literal.startswith("hf_") + or endpoint_literal.startswith("/hf_") + or endpoint_literal.startswith("/api/hf_") + ): + raise ValueError("Do not call helper names through call_api; call hf_* helpers directly.") + if re.match(r"^/api/collections/(?:[^/]+/)?[^/]+/items$", endpoint_literal): + raise ValueError("Use `hf_collection_items(...)` for collection contents instead of guessing `/api/collections/.../items`.") + preferred_helper = _preferred_helper_for_endpoint(endpoint_literal) + if preferred_helper is not None: + raise ValueError(f"Use `{preferred_helper}(...)` instead of `call_api({endpoint_literal!r}, ...)` for this endpoint family.") + if not endpoint_literal.startswith("/api/"): + raise ValueError("call_api endpoint must be a raw path starting with '/api/...'.") +async def _run_with_monty( + *, + code: str, + query: str, + max_calls: int, + strict_mode: bool, + timeout_sec: int, +) -> dict[str, Any]: + try: + import pydantic_monty + except Exception as e: + raise RuntimeError("pydantic_monty is not installed. Install with `uv pip install pydantic-monty`.") from e + + max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) + call_count = {"n": 0} + trace: list[dict[str, Any]] = [] + limit_summaries: list[dict[str, Any]] = [] + latest_helper_error: dict[str, Any] | None = None + internal_helper_used = {"used": False} + def _budget_remaining() -> int: + return max(0, max_calls - call_count["n"]) + + def _policy_int(helper_name: str, key: str, default: int) -> int: + cfg = PAGINATION_POLICY.get(helper_name) or {} + try: + return int(cfg.get(key, default)) + except Exception: + return int(default) + + def _consume_call(endpoint: str, method: str = "GET") -> int: + if call_count["n"] >= max_calls: + raise RuntimeError(f"Max API calls exceeded ({max_calls})") + call_count["n"] += 1 + return call_count["n"] + + def _trace_ok(idx: int, endpoint: str, method: str = "GET", status: int = 200) -> None: + trace.append( + { + "call_index": idx, + "depth": idx, + "method": method, + "endpoint": endpoint, + "ok": True, + "status": status, + } + ) + + def _trace_err(idx: int, endpoint: str, err: Any, method: str = "GET", status: int = 0) -> None: + trace.append( + { + "call_index": idx, + "depth": idx, + "method": method, + "endpoint": endpoint, + "ok": False, + "status": status, + "error": str(err), + } + ) + + def _host_raw_call( + endpoint: str, + *, + params: dict[str, Any] | None = None, + method: str = "GET", + json_body: dict[str, Any] | None = None, + ) -> dict[str, Any]: + idx = _consume_call(endpoint, method) + try: + resp = call_api_host( + endpoint, + method=method, + params=params, + json_body=json_body, + timeout_sec=timeout_sec, + strict_mode=strict_mode, + ) + if resp.get("ok"): + _trace_ok(idx, endpoint, method=method, status=int(resp.get("status") or 200)) + else: + _trace_err(idx, endpoint, resp.get("error"), method=method, status=int(resp.get("status") or 0)) + return resp + except Exception as e: + _trace_err(idx, endpoint, e, method=method, status=0) + raise + + hf_api_client: HfApi | None = None + + def _get_hf_api_client() -> HfApi: + nonlocal hf_api_client + if hf_api_client is None: + endpoint = os.getenv("HF_ENDPOINT", "https://huggingface.co").rstrip("/") + hf_api_client = HfApi(endpoint=endpoint, token=_load_token()) + return hf_api_client + + def _host_hf_call(endpoint: str, fn: Callable[[], Any]) -> Any: + idx = _consume_call(endpoint, "GET") + try: + out = fn() + _trace_ok(idx, endpoint, method="GET", status=200) + return out + except Exception as e: + _trace_err(idx, endpoint, e, method="GET", status=0) + raise + + def _helper_meta(start_calls: int, *, source: str, **extra: Any) -> dict[str, Any]: + out = { + "source": source, + "normalized": True, + "budget_used": max(0, call_count["n"] - start_calls), + "budget_remaining": _budget_remaining(), + } + out.update(extra) + return out + + def _derive_limit_metadata( + *, + requested_return_limit: int | None, + applied_return_limit: int, + default_limit_used: bool, + requested_scan_limit: int | None = None, + applied_scan_limit: int | None = None, + requested_max_pages: int | None = None, + applied_max_pages: int | None = None, + ) -> dict[str, Any]: + meta: dict[str, Any] = { + "requested_return_limit": requested_return_limit, + "applied_return_limit": applied_return_limit, + "default_limit_used": default_limit_used, + } + if requested_scan_limit is not None or applied_scan_limit is not None: + meta["requested_scan_limit"] = requested_scan_limit + meta["scan_limit"] = applied_scan_limit + meta["scan_limit_applied"] = requested_scan_limit != applied_scan_limit + if requested_max_pages is not None or applied_max_pages is not None: + meta["requested_max_pages"] = requested_max_pages + meta["applied_max_pages"] = applied_max_pages + meta["page_limit_applied"] = requested_max_pages != applied_max_pages + if requested_return_limit is not None: + meta["hard_cap_applied"] = applied_return_limit < requested_return_limit + return meta + + def _derive_more_available(*, sample_complete: bool, exact_count: bool, returned: int, total: int | None) -> bool | str: + if sample_complete: + return False + if exact_count and total is not None and returned < total: + return True + return "unknown" + + def _derive_truncated_by( + *, + hard_cap: bool = False, + scan_limit_hit: bool = False, + page_limit_hit: bool = False, + return_limit_hit: bool = False, + ) -> str: + causes = [hard_cap, scan_limit_hit, page_limit_hit, return_limit_hit] + if sum(1 for cause in causes if cause) > 1: + return "multiple" + if hard_cap: + return "hard_cap" + if scan_limit_hit: + return "scan_limit" + if page_limit_hit: + return "page_limit" + if return_limit_hit: + return "return_limit" + return "none" + + def _derive_can_request_more(*, sample_complete: bool, truncated_by: str) -> bool: + if sample_complete: + return False + return truncated_by in {"return_limit", "scan_limit", "page_limit", "multiple"} + + def _derive_next_request_hint(*, truncated_by: str, more_available: bool | str, applied_return_limit: int, applied_scan_limit: int | None = None, applied_max_pages: int | None = None) -> str: + if truncated_by == "return_limit": + return f"Ask for return_limit>{applied_return_limit} to see more rows" + if truncated_by == "scan_limit" and applied_scan_limit is not None: + return f"Increase scan_limit above {applied_scan_limit} for broader coverage" + if truncated_by == "page_limit" and applied_max_pages is not None: + return f"Increase max_pages above {applied_max_pages} to continue paging" + if truncated_by == "hard_cap": + return "No more rows can be returned in a single call because a hard cap was applied" + if truncated_by == "multiple": + return "Increase the relevant return/page/scan bounds to improve coverage" + if more_available is False: + return "No more results available" + if more_available == "unknown": + return "More results may exist; narrow filters or raise scan/page bounds for better coverage" + return "Ask for a larger limit to see more rows" + + def _resolve_exhaustive_limits( + *, + return_limit: int | None, + count_only: bool, + default_return: int, + max_return: int, + scan_limit: int | None = None, + scan_cap: int | None = None, + ) -> dict[str, Any]: + requested_return_limit = None if count_only else return_limit + effective_requested_return_limit = 0 if count_only else requested_return_limit + out: dict[str, Any] = { + "requested_return_limit": requested_return_limit, + "applied_return_limit": _clamp_int( + effective_requested_return_limit, + default=default_return, + minimum=0, + maximum=max_return, + ), + "default_limit_used": requested_return_limit is None and not count_only, + } + out["hard_cap_applied"] = ( + requested_return_limit is not None and out["applied_return_limit"] < requested_return_limit + ) + if scan_cap is not None: + out["requested_scan_limit"] = scan_limit + out["applied_scan_limit"] = _clamp_int( + scan_limit, + default=scan_cap, + minimum=1, + maximum=scan_cap, + ) + return out + + def _build_exhaustive_meta( + *, + base_meta: dict[str, Any], + limit_plan: dict[str, Any], + sample_complete: bool, + exact_count: bool, + truncated_by: str, + more_available: bool | str, + requested_max_pages: int | None = None, + applied_max_pages: int | None = None, + ) -> dict[str, Any]: + meta = dict(base_meta) + applied_return_limit = int(limit_plan["applied_return_limit"]) + applied_scan_limit = limit_plan.get("applied_scan_limit") + meta.update( + { + "complete": sample_complete, + "exact_count": exact_count, + "sample_complete": sample_complete, + "more_available": more_available, + "can_request_more": _derive_can_request_more( + sample_complete=sample_complete, + truncated_by=truncated_by, + ), + "truncated_by": truncated_by, + "next_request_hint": _derive_next_request_hint( + truncated_by=truncated_by, + more_available=more_available, + applied_return_limit=applied_return_limit, + applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None, + applied_max_pages=applied_max_pages, + ), + } + ) + meta.update( + _derive_limit_metadata( + requested_return_limit=limit_plan["requested_return_limit"], + applied_return_limit=applied_return_limit, + default_limit_used=bool(limit_plan["default_limit_used"]), + requested_scan_limit=limit_plan.get("requested_scan_limit"), + applied_scan_limit=applied_scan_limit if isinstance(applied_scan_limit, int) else None, + requested_max_pages=requested_max_pages, + applied_max_pages=applied_max_pages, + ) + ) + return meta + + def _overview_count_only_success( + *, + start_calls: int, + source: str, + total: int, + limit_plan: dict[str, Any], + base_meta: dict[str, Any], + ) -> dict[str, Any]: + sample_complete = True + more_available = False + truncated_by = "none" + meta = _build_exhaustive_meta( + base_meta={ + **base_meta, + "matched": total, + "returned": 0, + "total": total, + "total_available": total, + "total_matched": total, + "truncated": False, + }, + limit_plan=limit_plan, + sample_complete=sample_complete, + exact_count=True, + truncated_by=truncated_by, + more_available=more_available, + ) + return _helper_success( + start_calls=start_calls, + source=source, + items=[], + meta=meta, + ) + + def _build_exhaustive_result_meta( + *, + base_meta: dict[str, Any], + limit_plan: dict[str, Any], + matched_count: int, + returned_count: int, + exact_count: bool, + count_only: bool = False, + sample_complete: bool | None = None, + more_available: bool | str | None = None, + scan_limit_hit: bool = False, + page_limit_hit: bool = False, + truncated_extra: bool = False, + requested_max_pages: int | None = None, + applied_max_pages: int | None = None, + ) -> dict[str, Any]: + applied_return_limit = int(limit_plan["applied_return_limit"]) + if count_only: + effective_sample_complete = exact_count + else: + effective_sample_complete = ( + sample_complete + if isinstance(sample_complete, bool) + else exact_count and matched_count <= applied_return_limit + ) + return_limit_hit = False if count_only else (applied_return_limit > 0 and matched_count > applied_return_limit) + truncated_by = _derive_truncated_by( + hard_cap=bool(limit_plan.get("hard_cap_applied")), + scan_limit_hit=scan_limit_hit, + page_limit_hit=page_limit_hit, + return_limit_hit=return_limit_hit, + ) + truncated = truncated_by != "none" or truncated_extra + total_value = _as_int(base_meta.get("total")) + effective_more_available = more_available + if count_only and exact_count: + effective_more_available = False + if effective_more_available is None: + effective_more_available = _derive_more_available( + sample_complete=effective_sample_complete, + exact_count=exact_count, + returned=returned_count, + total=total_value, + ) + + return _build_exhaustive_meta( + base_meta={ + **base_meta, + "matched": matched_count, + "returned": returned_count, + "truncated": truncated, + }, + limit_plan=limit_plan, + sample_complete=effective_sample_complete, + exact_count=exact_count, + truncated_by=truncated_by, + more_available=effective_more_available, + requested_max_pages=requested_max_pages, + applied_max_pages=applied_max_pages, + ) + + def _helper_success( + *, + start_calls: int, + source: str, + items: list[dict[str, Any]], + cursor: str | None = None, + meta: dict[str, Any] | None = None, + **extra_meta: Any, + ) -> dict[str, Any]: + merged_meta = dict(meta or {}) + merged_meta.update(extra_meta) + if cursor is not None: + merged_meta["cursor"] = cursor + + return { + "ok": True, + "item": items[0] if len(items) == 1 else None, + "items": items, + "meta": _helper_meta(start_calls, source=source, **merged_meta), + "error": None, + } + + def _helper_error(*, start_calls: int, source: str, error: Any, **meta: Any) -> dict[str, Any]: + nonlocal latest_helper_error + envelope = { + "ok": False, + "item": None, + "items": [], + "meta": _helper_meta(start_calls, source=source, **meta), + "error": str(error), + } + latest_helper_error = envelope + return envelope + + def _project_items( + items: list[dict[str, Any]], + fields: list[str] | None, + aliases: dict[str, str] | None = None, + ) -> list[dict[str, Any]]: + if not isinstance(fields, list) or not fields: + return items + wanted = [str(f).strip() for f in fields if str(f).strip()] + if not wanted: + return items + alias_map = {str(k).strip().lower(): str(v).strip() for k, v in (aliases or {}).items() if str(k).strip() and str(v).strip()} + projected: list[dict[str, Any]] = [] + for row in items: + out: dict[str, Any] = {} + for key in wanted: + source_key = alias_map.get(key.lower(), key) + value = row.get(source_key) + if value is None: + continue + out[key] = value + projected.append(out) + return projected + + def _project_repo_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: + return _project_items(items, fields, aliases=_REPO_FIELD_ALIASES) + + def _project_collection_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: + return _project_items(items, fields, aliases=_COLLECTION_FIELD_ALIASES) + + def _project_user_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: + return _project_items(items, fields, aliases=_USER_FIELD_ALIASES) + + def _project_actor_items(items: list[dict[str, Any]], fields: list[str] | None) -> list[dict[str, Any]]: + return _project_items(items, fields, aliases=_ACTOR_FIELD_ALIASES) + + def _item_matches_where(item: dict[str, Any], where: dict[str, Any] | None) -> bool: + if not isinstance(where, dict) or not where: + return True + for key, cond in where.items(): + val = item.get(str(key)) + if isinstance(cond, dict): + if "eq" in cond and val != cond.get("eq"): + return False + if "in" in cond: + allowed = cond.get("in") + if isinstance(allowed, (list, tuple, set)) and val not in allowed: + return False + if "contains" in cond: + needle = cond.get("contains") + if not isinstance(val, str) or not isinstance(needle, str) or needle not in val: + return False + if "icontains" in cond: + needle = cond.get("icontains") + if not isinstance(val, str) or not isinstance(needle, str) or needle.lower() not in val.lower(): + return False + if "gte" in cond: + v = _as_int(val) + c = _as_int(cond.get("gte")) + if v is None or c is None or v < c: + return False + if "lte" in cond: + v = _as_int(val) + c = _as_int(cond.get("lte")) + if v is None or c is None or v > c: + return False + continue + + if isinstance(cond, (list, tuple, set)): + if val not in cond: + return False + continue + + if val != cond: + return False + return True + + def _apply_where(items: list[dict[str, Any]], where: dict[str, Any] | None) -> list[dict[str, Any]]: + if not isinstance(where, dict) or not where: + return items + return [row for row in items if _item_matches_where(row, where)] + + def _helper_item(resp: dict[str, Any]) -> dict[str, Any] | None: + item = resp.get("item") + if isinstance(item, dict): + return item + items = resp.get("items") + if isinstance(items, list) and items and isinstance(items[0], dict): + return items[0] + return None + + def _overview_count(item: dict[str, Any] | None, key: str) -> int | None: + if not isinstance(item, dict): + return None + return _as_int(item.get(key)) + + def _summary_section( + resp: dict[str, Any], + *, + count: int | None = None, + default_sample: list[dict[str, Any]] | None = None, + ) -> dict[str, Any]: + meta = resp.get("meta") + section_meta = dict(meta) if isinstance(meta, dict) else {} + sample = resp.get("items") + section_sample = sample if isinstance(sample, list) else list(default_sample or []) + section_count = count + if section_count is None: + count_exact = section_meta.get("exact_count") is True or section_meta.get("count_source") in {"overview", "endpoint"} + if count_exact: + for key in ("total", "total_matched", "matched"): + section_count = _as_int(section_meta.get(key)) + if section_count is not None: + break + if resp.get("ok") is not True: + section_meta["error"] = str(resp.get("error") or "section fetch failed") + section_sample = list(default_sample or []) + return {"count": section_count, "sample": section_sample, "meta": section_meta} + + async def _resolve_username_or_current(username: str | None) -> tuple[str | None, str | None]: + u = str(username or "").strip() + if u: + return u, None + + whoami = await hf_whoami() + if whoami.get("ok") is not True: + return None, str(whoami.get("error") or "Could not resolve current authenticated user") + + item = _helper_item(whoami) + resolved = item.get("username") if isinstance(item, dict) else None + if not isinstance(resolved, str) or not resolved.strip(): + return None, "username was not provided and current authenticated user could not be resolved" + return resolved.strip(), None + + def _normalize_user_likes_sort(sort: str | None) -> tuple[str | None, str | None]: + raw = str(sort or "likedAt").strip() + alias_map = { + "": "likedAt", + "likedat": "likedAt", + "liked_at": "likedAt", + "liked-at": "likedAt", + "recency": "likedAt", + "repolikes": "repoLikes", + "repo_likes": "repoLikes", + "repo-likes": "repoLikes", + "repodownloads": "repoDownloads", + "repo_downloads": "repoDownloads", + "repo-downloads": "repoDownloads", + } + normalized = alias_map.get(raw.lower(), raw) + if normalized not in {"likedAt", "repoLikes", "repoDownloads"}: + return None, "sort must be one of likedAt, repoLikes, repoDownloads" + return normalized, None + + def _author_from_any(value: Any) -> str | None: + if isinstance(value, str): + return value + if isinstance(value, dict): + for k in ("name", "username", "user", "login"): + v = value.get(k) + if isinstance(v, str) and v: + return v + return None + + def _clean_social_handle(value: Any) -> str | None: + if not isinstance(value, str): + return None + cleaned = value.strip() + if not cleaned: + return None + if re.match(r"^https?://", cleaned, flags=re.IGNORECASE): + return cleaned + return cleaned.lstrip("@") + + def _social_url(kind: str, value: Any) -> str | None: + cleaned = _clean_social_handle(value) + if cleaned is None: + return None + if re.match(r"^https?://", cleaned, flags=re.IGNORECASE): + return cleaned + if kind == "twitter": + return f"https://twitter.com/{cleaned}" + if kind == "github": + return f"https://github.com/{cleaned}" + if kind == "linkedin": + if cleaned.startswith(("in/", "company/")): + return f"https://www.linkedin.com/{cleaned}" + return f"https://www.linkedin.com/in/{cleaned}" + if kind == "bluesky": + return f"https://bsky.app/profile/{cleaned}" + return cleaned + + async def call_api( + endpoint: str, + params: dict[str, Any] | None = None, + method: str = "GET", + json_body: dict[str, Any] | None = None, + ) -> dict[str, Any]: + return _host_raw_call(endpoint, params=params, method=method, json_body=json_body) + + async def hf_whoami() -> dict[str, Any]: + start_calls = call_count["n"] + endpoint = "/api/whoami-v2" + token = _load_token() + if token is None: + return _helper_error( + start_calls=start_calls, + source=endpoint, + error=( + "Current authenticated user is unavailable for this request. " + "No request-scoped or fallback HF token was found." + ), + ) + try: + payload = _host_hf_call( + endpoint, + lambda: _get_hf_api_client().whoami(token=token, cache=True), + ) + except Exception as e: + return _helper_error(start_calls=start_calls, source=endpoint, error=e) + + username = payload.get("name") or payload.get("user") or payload.get("username") + item = {"username": username, "fullname": payload.get("fullname"), "isPro": payload.get("isPro")} + items = [item] if isinstance(username, str) and username else [] + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=items, + scanned=1, + matched=len(items), + returned=len(items), + truncated=False, + ) + + async def hf_user_overview(username: str) -> dict[str, Any]: + start_calls = call_count["n"] + u = str(username or "").strip() + if not u: + return _helper_error(start_calls=start_calls, source="/api/users//overview", error="username is required") + endpoint = f"/api/users/{u}/overview" + try: + obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_user_overview(u)) + except Exception as e: + return _helper_error(start_calls=start_calls, source=endpoint, error=e) + + twitter = getattr(obj, "twitter", None) or getattr(obj, "twitterUsername", None) + github = getattr(obj, "github", None) or getattr(obj, "githubUsername", None) + linkedin = getattr(obj, "linkedin", None) or getattr(obj, "linkedinUsername", None) + bluesky = getattr(obj, "bluesky", None) or getattr(obj, "blueskyUsername", None) + + if _budget_remaining() > 0 and any(v in {None, ""} for v in [twitter, github, linkedin, bluesky]): + socials_ep = f"/api/users/{u}/socials" + socials_resp = _host_raw_call(socials_ep) + if socials_resp.get("ok"): + socials_payload = socials_resp.get("data") if isinstance(socials_resp.get("data"), dict) else {} + handles = socials_payload.get("socialHandles") if isinstance(socials_payload.get("socialHandles"), dict) else {} + twitter = twitter or handles.get("twitter") + github = github or handles.get("github") + linkedin = linkedin or handles.get("linkedin") + bluesky = bluesky or handles.get("bluesky") + + orgs_raw = getattr(obj, "orgs", None) + org_names: list[str] | None = None + if isinstance(orgs_raw, (list, tuple, set)): + names = [] + for org in orgs_raw: + if isinstance(org, str) and org.strip(): + names.append(org.strip()) + continue + name = getattr(org, "name", None) + if isinstance(name, str) and name.strip(): + names.append(name.strip()) + org_names = names or None + + twitter_handle = _clean_social_handle(twitter) + github_handle = _clean_social_handle(github) + linkedin_handle = _clean_social_handle(linkedin) + bluesky_handle = _clean_social_handle(bluesky) + + item = { + "username": obj.username or u, + "fullname": obj.fullname, + "bio": getattr(obj, "details", None), + "avatarUrl": obj.avatar_url, + "websiteUrl": getattr(obj, "websiteUrl", None), + "twitter": _social_url("twitter", twitter_handle), + "github": _social_url("github", github_handle), + "linkedin": _social_url("linkedin", linkedin_handle), + "bluesky": _social_url("bluesky", bluesky_handle), + "twitterHandle": twitter_handle, + "githubHandle": github_handle, + "linkedinHandle": linkedin_handle, + "blueskyHandle": bluesky_handle, + "followers": _as_int(obj.num_followers), + "following": _as_int(obj.num_following), + "likes": _as_int(obj.num_likes), + "models": _as_int(getattr(obj, "num_models", None)), + "datasets": _as_int(getattr(obj, "num_datasets", None)), + "spaces": _as_int(getattr(obj, "num_spaces", None)), + "discussions": _as_int(getattr(obj, "num_discussions", None)), + "papers": _as_int(getattr(obj, "num_papers", None)), + "upvotes": _as_int(getattr(obj, "num_upvotes", None)), + "orgs": org_names, + "isPro": obj.is_pro, + } + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=[item], + scanned=1, + matched=1, + returned=1, + truncated=False, + ) + + async def hf_org_overview(organization: str) -> dict[str, Any]: + start_calls = call_count["n"] + org = str(organization or "").strip() + if not org: + return _helper_error( + start_calls=start_calls, + source="/api/organizations//overview", + error="organization is required", + ) + endpoint = f"/api/organizations/{org}/overview" + try: + obj = _host_hf_call(endpoint, lambda: _get_hf_api_client().get_organization_overview(org)) + except Exception as e: + return _helper_error(start_calls=start_calls, source=endpoint, error=e) + + item = { + "organization": obj.name or org, + "displayName": obj.fullname, + "avatarUrl": obj.avatar_url, + "description": obj.details, + "websiteUrl": getattr(obj, "websiteUrl", None), + "followers": _as_int(obj.num_followers), + "members": _as_int(obj.num_users), + "models": _as_int(getattr(obj, "num_models", None)), + "datasets": _as_int(getattr(obj, "num_datasets", None)), + "spaces": _as_int(getattr(obj, "num_spaces", None)), + } + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=[item], + scanned=1, + matched=1, + returned=1, + truncated=False, + ) + + async def hf_org_members( + organization: str, + return_limit: int | None = None, + scan_limit: int | None = None, + count_only: bool = False, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + org = str(organization or "").strip() + if not org: + return _helper_error(start_calls=start_calls, source="/api/organizations//members", error="organization is required") + + default_return = _policy_int("hf_org_members", "default_return", 100) + scan_cap = _policy_int("hf_org_members", "scan_max", GRAPH_SCAN_LIMIT_CAP) + limit_plan = _resolve_exhaustive_limits( + return_limit=return_limit, + count_only=count_only, + default_return=default_return, + max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, + scan_limit=scan_limit, + scan_cap=scan_cap, + ) + ret_lim = int(limit_plan["applied_return_limit"]) + scan_lim = int(limit_plan["applied_scan_limit"]) + has_where = isinstance(where, dict) and bool(where) + + overview_total: int | None = None + overview_source = f"/api/organizations/{org}/overview" + if _budget_remaining() > 0: + try: + org_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_organization_overview(org)) + overview_total = _as_int(getattr(org_obj, "num_users", None)) + except Exception: + overview_total = None + + if count_only and not has_where and overview_total is not None: + return _overview_count_only_success( + start_calls=start_calls, + source=overview_source, + total=overview_total, + limit_plan=limit_plan, + base_meta={ + "scanned": 1, + "count_source": "overview", + "organization": org, + }, + ) + + endpoint = f"/api/organizations/{org}/members" + try: + rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_members(org), scan_lim))) + except Exception as e: + return _helper_error(start_calls=start_calls, source=endpoint, error=e, organization=org) + + normalized: list[dict[str, Any]] = [] + for row in rows: + handle = getattr(row, "username", None) + if not isinstance(handle, str) or not handle: + continue + item = { + "username": handle, + "fullname": getattr(row, "fullname", None), + "isPro": getattr(row, "is_pro", None), + "role": getattr(row, "role", None), + } + normalized.append(item) + + normalized = _apply_where(normalized, where) + observed_total = len(rows) + scan_exhaustive = observed_total < scan_lim + + overview_list_mismatch = ( + overview_total is not None + and scan_exhaustive + and observed_total != overview_total + ) + + if has_where: + exact_count = scan_exhaustive + total = len(normalized) + total_matched = len(normalized) + else: + if overview_total is not None: + exact_count = True + total = overview_total + total_matched = overview_total + else: + exact_count = scan_exhaustive + total = observed_total + total_matched = observed_total + + total_available = overview_total if overview_total is not None else observed_total + items = normalized[:ret_lim] + scan_limit_hit = not exact_count and observed_total >= scan_lim + count_source = "overview" if overview_total is not None and not has_where else "scan" + sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0) + more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) + if not exact_count and scan_limit_hit: + more_available = "unknown" if has_where else True + + items = _project_user_items(items, fields) + meta = _build_exhaustive_result_meta( + base_meta={ + "scanned": observed_total, + "total": total, + "total_available": total_available, + "total_matched": total_matched, + "count_source": count_source, + "lower_bound": bool(has_where and not exact_count), + "overview_total": overview_total, + "listed_total": observed_total, + "overview_list_mismatch": overview_list_mismatch, + "organization": org, + }, + limit_plan=limit_plan, + matched_count=len(normalized), + returned_count=len(items), + exact_count=exact_count, + count_only=count_only, + sample_complete=sample_complete, + more_available=more_available, + scan_limit_hit=scan_limit_hit, + ) + return _helper_success(start_calls=start_calls, source=endpoint, items=items, meta=meta) + + async def hf_repo_search( + query: str | None = None, + repo_type: str | None = None, + repo_types: list[str] | None = None, + author: str | None = None, + filters: list[str] | None = None, + sort: str | None = None, + limit: int = 20, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + advanced: dict[str, Any] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + default_return = _policy_int("hf_repo_search", "default_return", 20) + max_return = _policy_int("hf_repo_search", "max_return", SELECTIVE_ENDPOINT_RETURN_HARD_CAP) + + if repo_type is not None and repo_types is not None: + return _helper_error( + start_calls=start_calls, + source="/api/repos", + error="Pass either repo_type or repo_types, not both", + ) + + if repo_types is None: + if repo_type is None or not str(repo_type).strip(): + requested_repo_types = ["model"] + else: + rt = _canonical_repo_type(repo_type, default="") + if rt not in {"model", "dataset", "space"}: + return _helper_error( + start_calls=start_calls, + source="/api/repos", + error=f"Unsupported repo_type '{repo_type}'", + ) + requested_repo_types = [rt] + else: + raw_types = _coerce_str_list(repo_types) + if not raw_types: + return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_types must not be empty") + requested_repo_types: list[str] = [] + for raw in raw_types: + rt = _canonical_repo_type(raw, default="") + if rt not in {"model", "dataset", "space"}: + return _helper_error( + start_calls=start_calls, + source="/api/repos", + error=f"Unsupported repo_type '{raw}'", + ) + requested_repo_types.append(rt) + + filter_list = _coerce_str_list(filters) + term = str(query or "").strip() + author_clean = str(author or "").strip() or None + requested_limit = limit + lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return) + limit_meta = _derive_limit_metadata( + requested_return_limit=requested_limit, + applied_return_limit=lim, + default_limit_used=limit == default_return, + ) + hard_cap_applied = bool(limit_meta.get("hard_cap_applied")) + + if advanced is not None and not isinstance(advanced, dict): + return _helper_error(start_calls=start_calls, source="/api/repos", error="advanced must be a dict when provided") + if advanced is not None and len(requested_repo_types) != 1: + return _helper_error( + start_calls=start_calls, + source="/api/repos", + error="advanced may only be used with a single repo_type", + ) + + sort_keys: dict[str, str | None] = {} + for rt in requested_repo_types: + sort_key, sort_error = _normalize_repo_sort_key(rt, sort) + if sort_error: + return _helper_error(start_calls=start_calls, source=f"/api/{rt}s", error=sort_error) + sort_keys[rt] = sort_key + + all_items: list[dict[str, Any]] = [] + scanned = 0 + source_endpoints: list[str] = [] + limit_boundary_hit = False + api = _get_hf_api_client() + + for rt in requested_repo_types: + endpoint = f"/api/{rt}s" + source_endpoints.append(endpoint) + extra_args = dict(advanced or {}) if len(requested_repo_types) == 1 else {} + allowed_extra = _REPO_SEARCH_EXTRA_ARGS.get(rt, set()) + unsupported = sorted(str(k) for k in extra_args.keys() if str(k) not in allowed_extra) + if unsupported: + return _helper_error( + start_calls=start_calls, + source=endpoint, + error=( + f"Unsupported advanced args for repo_type='{rt}': {unsupported}. " + f"Allowed advanced args: {sorted(allowed_extra)}" + ), + ) + if "card_data" in extra_args and "cardData" not in extra_args: + extra_args["cardData"] = extra_args.pop("card_data") + else: + extra_args.pop("card_data", None) + + if not any(key in extra_args for key in ("expand", "full", "cardData", "fetch_config")): + extra_args["expand"] = list(_REPO_SEARCH_DEFAULT_EXPAND[rt]) + + try: + if rt == "model": + payload = _host_hf_call( + endpoint, + lambda: list( + api.list_models( + search=term or None, + author=author_clean, + filter=filter_list or None, + sort=sort_keys[rt], # type: ignore[arg-type] + limit=lim, + **extra_args, + ) + ), + ) + elif rt == "dataset": + payload = _host_hf_call( + endpoint, + lambda: list( + api.list_datasets( + search=term or None, + author=author_clean, + filter=filter_list or None, + sort=sort_keys[rt], # type: ignore[arg-type] + limit=lim, + **extra_args, + ) + ), + ) + else: + payload = _host_hf_call( + endpoint, + lambda: list( + api.list_spaces( + search=term or None, + author=author_clean, + filter=filter_list or None, + sort=sort_keys[rt], # type: ignore[arg-type] + limit=lim, + **extra_args, + ) + ), + ) + except Exception as e: + return _helper_error(start_calls=start_calls, source=endpoint, error=e) + + scanned += len(payload) + if len(payload) >= lim: + limit_boundary_hit = True + all_items.extend(_normalize_repo_search_row(row, rt) for row in payload[:lim]) + + all_items = _apply_where(all_items, where) + combined_sort_key = next(iter(sort_keys.values()), None) + all_items = _sort_repo_rows(all_items, combined_sort_key) + matched = len(all_items) + all_items = _project_repo_items(all_items[:lim], fields) + more_available: bool | str = False + truncated = False + truncated_by = "none" + next_request_hint: str | None = None + if hard_cap_applied and scanned >= lim: + truncated = True + truncated_by = "hard_cap" + more_available = "unknown" + next_request_hint = f"Increase limit above {lim} to improve coverage" + elif limit_boundary_hit: + more_available = "unknown" + next_request_hint = f"Increase limit above {lim} to check whether more rows exist" + + return _helper_success( + start_calls=start_calls, + source=",".join(source_endpoints), + items=all_items, + query=term or None, + repo_types=requested_repo_types, + filters=filter_list or None, + sort=combined_sort_key, + author=author_clean, + limit=lim, + scanned=scanned, + matched=matched, + returned=len(all_items), + truncated=truncated, + truncated_by=truncated_by, + more_available=more_available, + limit_boundary_hit=limit_boundary_hit, + next_request_hint=next_request_hint, + **limit_meta, + ) + + async def _user_graph_helper( + kind: str, + username: str, + pro_only: bool | None, + return_limit: int | None, + scan_limit: int | None, + count_only: bool, + where: dict[str, Any] | None, + fields: list[str] | None, + *, + helper_name: str, + ) -> dict[str, Any]: + start_calls = call_count["n"] + default_return = _policy_int(helper_name, "default_return", 100) + scan_cap = _policy_int(helper_name, "scan_max", GRAPH_SCAN_LIMIT_CAP) + max_return = _policy_int(helper_name, "max_return", EXHAUSTIVE_HELPER_RETURN_HARD_CAP) + + u = str(username or "").strip() + if not u: + return _helper_error(start_calls=start_calls, source=f"/api/users//{kind}", error="username is required") + + limit_plan = _resolve_exhaustive_limits( + return_limit=return_limit, + count_only=count_only, + default_return=default_return, + max_return=max_return, + scan_limit=scan_limit, + scan_cap=scan_cap, + ) + ret_lim = int(limit_plan["applied_return_limit"]) + scan_lim = int(limit_plan["applied_scan_limit"]) + has_where = isinstance(where, dict) and bool(where) + filtered = (pro_only is not None) or has_where + + entity_type = "user" + overview_total: int | None = None + overview_source = f"/api/users/{u}/overview" + if _budget_remaining() > 0: + try: + user_obj = _host_hf_call(overview_source, lambda: _get_hf_api_client().get_user_overview(u)) + overview_total = _as_int(user_obj.num_followers if kind == "followers" else user_obj.num_following) + except Exception: + org_overview_source = f"/api/organizations/{u}/overview" + try: + org_obj = _host_hf_call(org_overview_source, lambda: _get_hf_api_client().get_organization_overview(u)) + except Exception: + overview_total = None + else: + entity_type = "organization" + overview_source = org_overview_source + if kind != "followers": + return _helper_error( + start_calls=start_calls, + source=f"/api/organizations/{u}/{kind}", + error="organization graph only supports relation='followers'; organizations do not expose a following list", + relation=kind, + organization=u, + entity=u, + entity_type=entity_type, + ) + overview_total = _as_int(getattr(org_obj, "num_followers", None)) + + if count_only and not filtered and overview_total is not None: + return _overview_count_only_success( + start_calls=start_calls, + source=overview_source, + total=overview_total, + limit_plan=limit_plan, + base_meta={ + "scanned": 1, + "count_source": "overview", + "relation": kind, + "pro_only": pro_only, + "where_applied": has_where, + "entity": u, + "entity_type": entity_type, + "username": u, + "organization": u if entity_type == "organization" else None, + }, + ) + + endpoint = f"/api/users/{u}/{kind}" + try: + if entity_type == "organization": + endpoint = f"/api/organizations/{u}/followers" + rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_organization_followers(u), scan_lim))) + elif kind == "followers": + rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_followers(u), scan_lim))) + else: + rows = _host_hf_call(endpoint, lambda: list(islice(_get_hf_api_client().list_user_following(u), scan_lim))) + except Exception as e: + return _helper_error( + start_calls=start_calls, + source=endpoint, + error=e, + relation=kind, + username=u, + entity=u, + entity_type=entity_type, + organization=u if entity_type == "organization" else None, + ) + + normalized: list[dict[str, Any]] = [] + for row in rows: + handle = getattr(row, "username", None) + if not isinstance(handle, str) or not handle: + continue + item = { + "username": handle, + "fullname": getattr(row, "fullname", None), + "isPro": getattr(row, "is_pro", None), + } + if pro_only is True and item.get("isPro") is not True: + continue + if pro_only is False and item.get("isPro") is True: + continue + normalized.append(item) + + normalized = _apply_where(normalized, where) + observed_total = len(rows) + scan_exhaustive = observed_total < scan_lim + + overview_list_mismatch = ( + overview_total is not None + and scan_exhaustive + and observed_total != overview_total + ) + + if filtered: + exact_count = scan_exhaustive + total = len(normalized) + total_matched = len(normalized) + else: + if overview_total is not None: + exact_count = True + total = overview_total + total_matched = overview_total + else: + exact_count = scan_exhaustive + total = observed_total + total_matched = observed_total + + total_available = overview_total if overview_total is not None else observed_total + items = normalized[:ret_lim] + scan_limit_hit = not exact_count and observed_total >= scan_lim + count_source = "overview" if overview_total is not None and not filtered else "scan" + sample_complete = exact_count and len(normalized) <= ret_lim and (not count_only or len(normalized) == 0) + more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) + if not exact_count and scan_limit_hit: + more_available = "unknown" if filtered else True + + items = _project_user_items(items, fields) + meta = _build_exhaustive_result_meta( + base_meta={ + "scanned": observed_total, + "total": total, + "total_available": total_available, + "total_matched": total_matched, + "count_source": count_source, + "lower_bound": bool(filtered and not exact_count), + "overview_total": overview_total, + "listed_total": observed_total, + "overview_list_mismatch": overview_list_mismatch, + "relation": kind, + "pro_only": pro_only, + "where_applied": has_where, + "entity": u, + "entity_type": entity_type, + "username": u, + "organization": u if entity_type == "organization" else None, + }, + limit_plan=limit_plan, + matched_count=len(normalized), + returned_count=len(items), + exact_count=exact_count, + count_only=count_only, + sample_complete=sample_complete, + more_available=more_available, + scan_limit_hit=scan_limit_hit, + ) + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=items, + meta=meta, + ) + + async def hf_profile_summary( + handle: str | None = None, + include: list[str] | None = None, + likes_limit: int = 10, + activity_limit: int = 10, + ) -> dict[str, Any]: + start_calls = call_count["n"] + resolved_handle, resolve_error = await _resolve_username_or_current(handle) + if resolve_error: + return _helper_error(start_calls=start_calls, source="/api/users//overview", error=resolve_error) + if not isinstance(resolved_handle, str): + return _helper_error( + start_calls=start_calls, + source="/api/users//overview", + error="handle was not provided and current authenticated user could not be resolved", + ) + + try: + requested_sections = ( + {part.lower() for part in _coerce_str_list(include) if part.strip()} if include is not None else set() + ) + except ValueError as e: + return _helper_error( + start_calls=start_calls, + source=f"/api/users/{resolved_handle}/overview", + error=e, + ) + invalid_sections = sorted(requested_sections - {"likes", "activity"}) + if invalid_sections: + return _helper_error( + start_calls=start_calls, + source=f"/api/users/{resolved_handle}/overview", + error=f"Unsupported include values: {invalid_sections}", + ) + + likes_lim = _clamp_int(likes_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT) + activity_lim = _clamp_int(activity_limit, default=10, minimum=0, maximum=OUTPUT_ITEMS_TRUNCATION_LIMIT) + section_errors: dict[str, str] = {} + + user_overview = await hf_user_overview(resolved_handle) + if user_overview.get("ok") is True: + overview_item = _helper_item(user_overview) or {"username": resolved_handle} + item: dict[str, Any] = { + "handle": str(overview_item.get("username") or resolved_handle), + "entity_type": "user", + "display_name": overview_item.get("fullname") or str(overview_item.get("username") or resolved_handle), + "bio": overview_item.get("bio"), + "avatar_url": overview_item.get("avatarUrl"), + "website_url": overview_item.get("websiteUrl"), + "twitter_url": overview_item.get("twitter"), + "github_url": overview_item.get("github"), + "linkedin_url": overview_item.get("linkedin"), + "bluesky_url": overview_item.get("bluesky"), + "followers_count": _overview_count(overview_item, "followers"), + "following_count": _overview_count(overview_item, "following"), + "likes_count": _overview_count(overview_item, "likes"), + "models_count": _overview_count(overview_item, "models"), + "datasets_count": _overview_count(overview_item, "datasets"), + "spaces_count": _overview_count(overview_item, "spaces"), + "discussions_count": _overview_count(overview_item, "discussions"), + "papers_count": _overview_count(overview_item, "papers"), + "upvotes_count": _overview_count(overview_item, "upvotes"), + "organizations": overview_item.get("orgs"), + "is_pro": overview_item.get("isPro"), + } + + if "likes" in requested_sections: + likes = await hf_user_likes( + username=resolved_handle, + return_limit=likes_lim, + scan_limit=USER_SUMMARY_LIKES_SCAN_LIMIT, + count_only=likes_lim == 0, + sort="likedAt", + fields=["liked_at", "repo_id", "repo_type", "repo_author", "repo_url"], + ) + item["likes_sample"] = likes.get("items") if likes.get("ok") is True else [] + if likes.get("ok") is not True: + section_errors["likes"] = str(likes.get("error") or "likes fetch failed") + + if "activity" in requested_sections: + activity = await hf_recent_activity( + feed_type="user", + entity=resolved_handle, + return_limit=activity_lim, + max_pages=USER_SUMMARY_ACTIVITY_MAX_PAGES, + count_only=activity_lim == 0, + fields=["timestamp", "event_type", "repo_type", "repo_id"], + ) + item["activity_sample"] = activity.get("items") if activity.get("ok") is True else [] + if activity.get("ok") is not True: + section_errors["activity"] = str(activity.get("error") or "activity fetch failed") + + return _helper_success( + start_calls=start_calls, + source=f"/api/users/{resolved_handle}/overview", + items=[item], + scanned=1, + matched=1, + returned=1, + truncated=False, + handle=resolved_handle, + entity_type="user", + include=sorted(requested_sections), + likes_limit=likes_lim, + activity_limit=activity_lim, + section_errors=section_errors or None, + ) + + org_overview = await hf_org_overview(resolved_handle) + if org_overview.get("ok") is True: + overview_item = _helper_item(org_overview) or {"organization": resolved_handle} + item = { + "handle": str(overview_item.get("organization") or resolved_handle), + "entity_type": "organization", + "display_name": overview_item.get("displayName") or str(overview_item.get("organization") or resolved_handle), + "description": overview_item.get("description"), + "avatar_url": overview_item.get("avatarUrl"), + "website_url": overview_item.get("websiteUrl"), + "followers_count": _overview_count(overview_item, "followers"), + "members_count": _overview_count(overview_item, "members"), + "models_count": _overview_count(overview_item, "models"), + "datasets_count": _overview_count(overview_item, "datasets"), + "spaces_count": _overview_count(overview_item, "spaces"), + } + return _helper_success( + start_calls=start_calls, + source=f"/api/organizations/{resolved_handle}/overview", + items=[item], + scanned=1, + matched=1, + returned=1, + truncated=False, + handle=resolved_handle, + entity_type="organization", + include=[], + ignored_includes=sorted(requested_sections) or None, + ) + + error = user_overview.get("error") or org_overview.get("error") or "profile fetch failed" + return _helper_error( + start_calls=start_calls, + source=f"/api/profiles/{resolved_handle}", + error=error, + handle=resolved_handle, + ) + + async def hf_user_graph( + username: str | None = None, + relation: str = "followers", + return_limit: int | None = None, + scan_limit: int | None = None, + count_only: bool = False, + pro_only: bool | None = None, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + rel = str(relation or "").strip().lower() or "followers" + if rel not in {"followers", "following"}: + return _helper_error( + start_calls=start_calls, + source="/api/users//followers", + error="relation must be 'followers' or 'following'", + ) + + resolved_username, resolve_error = await _resolve_username_or_current(username) + if resolve_error: + return _helper_error(start_calls=start_calls, source=f"/api/users//{rel}", error=resolve_error, relation=rel) + if not isinstance(resolved_username, str): + return _helper_error(start_calls=start_calls, source=f"/api/users//{rel}", error="username is required", relation=rel) + + return await _user_graph_helper( + rel, + resolved_username, + pro_only, + return_limit, + scan_limit, + count_only, + where, + fields, + helper_name="hf_user_graph", + ) + + async def hf_user_likes( + username: str | None = None, + repo_types: list[str] | None = None, + return_limit: int | None = None, + scan_limit: int | None = None, + count_only: bool = False, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + sort: str | None = None, + ranking_window: int | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + default_return = _policy_int("hf_user_likes", "default_return", 100) + scan_cap = _policy_int("hf_user_likes", "scan_max", LIKES_SCAN_LIMIT_CAP) + ranking_default = _policy_int("hf_user_likes", "ranking_default", LIKES_RANKING_WINDOW_DEFAULT) + enrich_cap = _policy_int("hf_user_likes", "enrich_max", LIKES_ENRICHMENT_MAX_REPOS) + + resolved_username, resolve_error = await _resolve_username_or_current(username) + if resolve_error: + return _helper_error(start_calls=start_calls, source="/api/users//likes", error=resolve_error) + if not isinstance(resolved_username, str): + return _helper_error(start_calls=start_calls, source="/api/users//likes", error="username is required") + + sort_key, sort_error = _normalize_user_likes_sort(sort) + if sort_error: + return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=sort_error) + if sort_key is None: + return _helper_error( + start_calls=start_calls, + source=f"/api/users/{resolved_username}/likes", + error="sort must be one of likedAt, repoLikes, repoDownloads", + ) + + limit_plan = _resolve_exhaustive_limits( + return_limit=return_limit, + count_only=count_only, + default_return=default_return, + max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, + scan_limit=scan_limit, + scan_cap=scan_cap, + ) + ret_lim = int(limit_plan["applied_return_limit"]) + scan_lim = int(limit_plan["applied_scan_limit"]) + + allowed_repo_types: set[str] | None = None + try: + raw_repo_types: list[str] = _coerce_str_list(repo_types) if repo_types is not None else [] + except ValueError as e: + return _helper_error(start_calls=start_calls, source=f"/api/users/{resolved_username}/likes", error=e) + if raw_repo_types: + allowed_repo_types = set() + for raw in raw_repo_types: + canonical = _canonical_repo_type(raw, default="") + if canonical not in {"model", "dataset", "space"}: + return _helper_error( + start_calls=start_calls, + source=f"/api/users/{resolved_username}/likes", + error=f"Unsupported repo_type '{raw}'", + ) + allowed_repo_types.add(canonical) + + endpoint = f"/api/users/{resolved_username}/likes" + resp = _host_raw_call(endpoint, params={"limit": scan_lim}) + if not resp.get("ok"): + return _helper_error( + start_calls=start_calls, + source=endpoint, + error=resp.get("error") or "likes fetch failed", + ) + + payload = resp.get("data") if isinstance(resp.get("data"), list) else [] + scanned_rows = payload[:scan_lim] + matched_rows: list[tuple[int, dict[str, Any]]] = [] + + for row in scanned_rows: + if not isinstance(row, dict): + continue + repo = row.get("repo") if isinstance(row.get("repo"), dict) else {} + repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else {} + + repo_id = repo_data.get("id") or repo_data.get("name") or repo.get("name") + if not isinstance(repo_id, str) or not repo_id: + continue + + repo_type = _canonical_repo_type(repo_data.get("type") or repo.get("type"), default="") + if not repo_type: + repo_type = _canonical_repo_type(repo.get("type"), default="model") + if allowed_repo_types is not None and repo_type not in allowed_repo_types: + continue + + repo_author = repo_data.get("author") + if not isinstance(repo_author, str) and "/" in repo_id: + repo_author = repo_id.split("/", 1)[0] + + item = { + "likedAt": row.get("likedAt") or row.get("createdAt"), + "liked_at": row.get("likedAt") or row.get("createdAt"), + "repoId": repo_id, + "repo_id": repo_id, + "repoType": repo_type, + "repo_type": repo_type, + "repoAuthor": repo_author, + "repo_author": repo_author, + "repoLikes": _as_int(repo_data.get("likes")), + "repo_likes": _as_int(repo_data.get("likes")), + "repoDownloads": _as_int(repo_data.get("downloads")), + "repo_downloads": _as_int(repo_data.get("downloads")), + "likes": _as_int(repo_data.get("likes")), + "downloads": _as_int(repo_data.get("downloads")), + "repo_url": _repo_web_url(repo_type, repo_id), + } + if not _item_matches_where(item, where): + continue + matched_rows.append((len(matched_rows), item)) + + matched = len(matched_rows) + scan_exhaustive = len(payload) < scan_lim + exact_count = scan_exhaustive + total_matched = matched + total = total_matched + effective_ranking_window: int | None = None + ranking_complete = sort_key == "likedAt" and exact_count + enriched = 0 + + selected_pairs: list[tuple[int, dict[str, Any]]] + if count_only: + selected_pairs = [] + ranking_complete = False if matched > 0 else exact_count + elif sort_key == "likedAt": + selected_pairs = matched_rows[:ret_lim] + else: + metric = str(sort_key) + requested_window = ranking_window if ranking_window is not None else ranking_default + effective_ranking_window = _clamp_int( + requested_window, + default=ranking_default, + minimum=1, + maximum=enrich_cap, + ) + shortlist_size = min(effective_ranking_window, matched, scan_lim) + shortlist = matched_rows[:shortlist_size] + candidates = [ + pair + for pair in shortlist + if pair[1].get(metric) is None + and isinstance(pair[1].get("repoId"), str) + and pair[1].get("repoType") in {"model", "dataset", "space"} + ] + enrich_budget = min(len(candidates), _budget_remaining(), shortlist_size) + for _, item in candidates[:enrich_budget]: + repo_type = str(item.get("repoType")) + repo_id = str(item.get("repoId")) + detail_endpoint = f"/api/{_canonical_repo_type(repo_type)}s/{repo_id}" + try: + detail = _host_hf_call( + detail_endpoint, + lambda rt=repo_type, rid=repo_id: ( + _get_hf_api_client().model_info(rid) + if _canonical_repo_type(rt) == "model" + else _get_hf_api_client().dataset_info(rid) + if _canonical_repo_type(rt) == "dataset" + else _get_hf_api_client().space_info(rid) + ), + ) + except Exception: + continue + + likes = _as_int(getattr(detail, "likes", None)) + downloads = _as_int(getattr(detail, "downloads", None)) + if likes is not None: + item["repoLikes"] = likes + item["repo_likes"] = likes + item["likes"] = likes + if downloads is not None: + item["repoDownloads"] = downloads + item["repo_downloads"] = downloads + item["downloads"] = downloads + enriched += 1 + + def _ranking_key(pair: tuple[int, dict[str, Any]]) -> tuple[int, int, int]: + idx, row = pair + metric_value = _as_int(row.get(metric)) + if metric_value is None: + return (1, 0, idx) + return (0, -metric_value, idx) + + ranked_shortlist = sorted(shortlist, key=_ranking_key) + selected_pairs = ranked_shortlist[:ret_lim] + ranking_complete = exact_count and shortlist_size >= matched and len(candidates) <= enrich_budget + + items = _project_items([row for _, row in selected_pairs], fields) + popularity_present = sum(1 for _, row in selected_pairs if row.get("repoLikes") is not None) + sample_complete = ( + exact_count + and ret_lim >= matched + and (sort_key == "likedAt" or ranking_complete) + and (not count_only or matched == 0) + ) + scan_limit_hit = not scan_exhaustive and len(payload) >= scan_lim + more_available = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=total) + if scan_limit_hit: + more_available = "unknown" if (allowed_repo_types is not None or where) else True + + meta = _build_exhaustive_result_meta( + base_meta={ + "scanned": len(scanned_rows), + "total": total, + "total_available": len(payload), + "total_matched": total_matched, + "count_source": "scan", + "lower_bound": not exact_count, + "enriched": enriched, + "popularity_present": popularity_present, + "sort_applied": sort_key, + "ranking_window": effective_ranking_window, + "ranking_complete": ranking_complete, + "username": resolved_username, + }, + limit_plan=limit_plan, + matched_count=matched, + returned_count=len(items), + exact_count=exact_count, + count_only=count_only, + sample_complete=sample_complete, + more_available=more_available, + scan_limit_hit=scan_limit_hit, + truncated_extra=sort_key != "likedAt" and not ranking_complete, + ) + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=items, + meta=meta, + ) + + async def hf_repo_likers( + repo_id: str, + repo_type: str, + return_limit: int | None = None, + count_only: bool = False, + pro_only: bool | None = None, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + rid = str(repo_id or "").strip() + if not rid: + return _helper_error(start_calls=start_calls, source="/api/repos//likers", error="repo_id is required") + + rt = _canonical_repo_type(repo_type, default="") + if rt not in {"model", "dataset", "space"}: + return _helper_error( + start_calls=start_calls, + source=f"/api/repos/{rid}/likers", + error=f"Unsupported repo_type '{repo_type}'", + repo_id=rid, + ) + + default_return = _policy_int("hf_repo_likers", "default_return", 1_000) + requested_return_limit = return_limit + default_limit_used = requested_return_limit is None and not count_only + has_where = isinstance(where, dict) and bool(where) + + endpoint = f"/api/{rt}s/{rid}/likers" + resp = _host_raw_call(endpoint) + if not resp.get("ok"): + return _helper_error( + start_calls=start_calls, + source=endpoint, + error=resp.get("error") or "repo likers fetch failed", + repo_id=rid, + repo_type=rt, + ) + + payload = resp.get("data") if isinstance(resp.get("data"), list) else [] + normalized: list[dict[str, Any]] = [] + for row in payload: + if not isinstance(row, dict): + continue + username = row.get("user") or row.get("username") + if not isinstance(username, str) or not username: + continue + item = { + "username": username, + "fullname": row.get("fullname"), + "type": row.get("type") if isinstance(row.get("type"), str) and row.get("type") else "user", + "isPro": row.get("isPro"), + } + if pro_only is True and item.get("isPro") is not True: + continue + if pro_only is False and item.get("isPro") is True: + continue + if not _item_matches_where(item, where): + continue + normalized.append(item) + + # /likers is a one-shot full-list endpoint: the Hub returns the liker rows in a + # single response with no cursor/scan continuation. Keep the default output compact, + # but do not apply the generic exhaustive hard cap here because it does not improve + # upstream coverage or cost; the full liker set has already been fetched. + if count_only: + ret_lim = 0 + elif requested_return_limit is None: + ret_lim = default_return + else: + try: + ret_lim = max(0, int(requested_return_limit)) + except Exception: + ret_lim = default_return + limit_plan = { + "requested_return_limit": requested_return_limit, + "applied_return_limit": ret_lim, + "default_limit_used": default_limit_used, + "hard_cap_applied": False, + } + + matched = len(normalized) + items = [] if count_only else normalized[:ret_lim] + return_limit_hit = ret_lim > 0 and matched > ret_lim + truncated_by = _derive_truncated_by( + hard_cap=False, + return_limit_hit=return_limit_hit, + ) + sample_complete = matched <= ret_lim and (not count_only or matched == 0) + truncated = truncated_by != "none" + more_available = _derive_more_available( + sample_complete=sample_complete, + exact_count=True, + returned=len(items), + total=matched, + ) + + items = _project_actor_items(items, fields) + + meta = _build_exhaustive_meta( + base_meta={ + "scanned": len(payload), + "matched": matched, + "returned": len(items), + "total": matched, + "total_available": len(payload), + "total_matched": matched, + "truncated": truncated, + "count_source": "likers_list", + "lower_bound": False, + "repo_id": rid, + "repo_type": rt, + "pro_only": pro_only, + "where_applied": has_where, + "upstream_pagination": "none", + }, + limit_plan=limit_plan, + sample_complete=sample_complete, + exact_count=True, + truncated_by=truncated_by, + more_available=more_available, + ) + meta["hard_cap_applied"] = False + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=items, + meta=meta, + ) + + async def hf_recent_activity( + feed_type: str | None = None, + entity: str | None = None, + activity_types: list[str] | None = None, + repo_types: list[str] | None = None, + return_limit: int | None = None, + max_pages: int | None = None, + start_cursor: str | None = None, + count_only: bool = False, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + default_return = _policy_int("hf_recent_activity", "default_return", 100) + page_cap = _policy_int("hf_recent_activity", "page_limit", RECENT_ACTIVITY_PAGE_SIZE) + pages_cap = _policy_int("hf_recent_activity", "max_pages", RECENT_ACTIVITY_SCAN_MAX_PAGES) + + requested_max_pages = max_pages + + ft = str(feed_type or "").strip().lower() + ent = str(entity or "").strip() + if ft not in {"user", "org"}: + if ft and not ent: + ent = ft + ft = "user" + elif not ft and ent: + ft = "user" + + if ft not in {"user", "org"}: + return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="feed_type must be 'user' or 'org'") + if not ent: + return _helper_error(start_calls=start_calls, source="/api/recent-activity", error="entity is required") + + limit_plan = _resolve_exhaustive_limits( + return_limit=return_limit, + count_only=count_only, + default_return=default_return, + max_return=EXHAUSTIVE_HELPER_RETURN_HARD_CAP, + ) + ret_lim = int(limit_plan["applied_return_limit"]) + page_lim = page_cap + pages_lim = _clamp_int(requested_max_pages, default=pages_cap, minimum=1, maximum=pages_cap) + + type_filter = {str(t).strip().lower() for t in (activity_types or []) if str(t).strip()} + repo_filter = {_canonical_repo_type(t, default="") for t in (repo_types or []) if str(t).strip()} + + next_cursor = str(start_cursor).strip() if isinstance(start_cursor, str) and start_cursor.strip() else None + items: list[dict[str, Any]] = [] + scanned = 0 + matched = 0 + pages = 0 + exhausted_feed = False + stopped_for_budget = False + + while pages < pages_lim and (ret_lim == 0 or len(items) < ret_lim): + if _budget_remaining() <= 0: + stopped_for_budget = True + break + + params: dict[str, Any] = {"feedType": ft, "entity": ent, "limit": page_lim} + if next_cursor: + params["cursor"] = next_cursor + + resp = _host_raw_call("/api/recent-activity", params=params) + if not resp.get("ok"): + if pages == 0: + return _helper_error( + start_calls=start_calls, + source="/api/recent-activity", + error=resp.get("error") or "recent-activity fetch failed", + ) + break + + payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} + rows = payload.get("recentActivity") if isinstance(payload.get("recentActivity"), list) else [] + cursor_raw = payload.get("cursor") + next_cursor = cursor_raw if isinstance(cursor_raw, str) and cursor_raw else None + pages += 1 + + if not rows: + exhausted_feed = True + break + + for row in rows: + if not isinstance(row, dict): + continue + scanned += 1 + + typ = str(row.get("type") or "").strip().lower() + repo_id = row.get("repoId") + repo_type = row.get("repoType") + repo_data = row.get("repoData") if isinstance(row.get("repoData"), dict) else None + repo_obj = row.get("repo") if isinstance(row.get("repo"), dict) else None + if repo_id is None and repo_data is not None: + repo_id = repo_data.get("id") or repo_data.get("name") + if repo_id is None and repo_obj is not None: + repo_id = repo_obj.get("id") or repo_obj.get("name") + if repo_type is None and repo_data is not None: + repo_type = repo_data.get("type") + if repo_type is None and repo_obj is not None: + repo_type = repo_obj.get("type") + + rt = _canonical_repo_type(repo_type, default="") if repo_type else "" + if type_filter and typ not in type_filter: + continue + if repo_filter and rt not in repo_filter: + continue + + item = { + "time": row.get("time"), + "timestamp": row.get("time"), + "type": row.get("type"), + "event_type": row.get("type"), + "repoType": rt or repo_type, + "repo_type": rt or repo_type, + "repoId": repo_id, + "repo_id": repo_id, + } + if not _item_matches_where(item, where): + continue + + matched += 1 + if len(items) < ret_lim: + items.append(item) + + if not next_cursor: + exhausted_feed = True + break + + items = _project_items(items, fields) + exact_count = exhausted_feed and not stopped_for_budget + sample_complete = exact_count and ret_lim >= matched and (not count_only or matched == 0) + page_limit_hit = next_cursor is not None and pages >= pages_lim and not exhausted_feed + more_available: bool | str = _derive_more_available(sample_complete=sample_complete, exact_count=exact_count, returned=len(items), total=matched if exact_count else None) + if next_cursor is not None: + more_available = True + elif stopped_for_budget and not exact_count: + more_available = "unknown" + + meta = _build_exhaustive_result_meta( + base_meta={ + "scanned": scanned, + "total": matched, + "total_matched": matched, + "pages": pages, + "count_source": "scan" if exact_count else "none", + "lower_bound": not exact_count, + "page_limit": page_lim, + "stopped_for_budget": stopped_for_budget, + "feed_type": ft, + "entity": ent, + }, + limit_plan=limit_plan, + matched_count=matched, + returned_count=len(items), + exact_count=exact_count, + count_only=count_only, + sample_complete=sample_complete, + more_available=more_available, + page_limit_hit=page_limit_hit, + truncated_extra=stopped_for_budget, + requested_max_pages=requested_max_pages, + applied_max_pages=pages_lim, + ) + return _helper_success( + start_calls=start_calls, + source="/api/recent-activity", + items=items, + meta=meta, + cursor=next_cursor, + ) + + async def hf_repo_discussions(repo_type: str, repo_id: str, limit: int = 20) -> dict[str, Any]: + start_calls = call_count["n"] + rt = _canonical_repo_type(repo_type) + rid = str(repo_id or "").strip() + if "/" not in rid: + return _helper_error(start_calls=start_calls, source="/api/.../discussions", error="repo_id must be owner/name") + + lim = _clamp_int(limit, default=20, minimum=1, maximum=SELECTIVE_ENDPOINT_RETURN_HARD_CAP) + endpoint = f"/api/{rt}s/{rid}/discussions" + try: + discussions = _host_hf_call( + endpoint, + lambda: list(islice(_get_hf_api_client().get_repo_discussions(repo_id=rid, repo_type=rt), lim)), + ) + except Exception as e: + return _helper_error(start_calls=start_calls, source=endpoint, error=e) + + items: list[dict[str, Any]] = [] + for d in discussions: + num = _as_int(getattr(d, "num", None)) + items.append( + { + "num": num, + "number": num, + "discussionNum": num, + "id": num, + "title": getattr(d, "title", None), + "author": getattr(d, "author", None), + "createdAt": str(getattr(d, "created_at", None)) if getattr(d, "created_at", None) is not None else None, + "status": getattr(d, "status", None), + } + ) + + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=items, + scanned=len(items), + matched=len(items), + returned=len(items), + truncated=False, + total_count=None, + ) + + async def hf_repo_discussion_details(repo_type: str, repo_id: str, discussion_num: int) -> dict[str, Any]: + start_calls = call_count["n"] + rt = _canonical_repo_type(repo_type) + rid = str(repo_id or "").strip() + if "/" not in rid: + return _helper_error(start_calls=start_calls, source="/api/.../discussions/", error="repo_id must be owner/name") + + num = _as_int(discussion_num) + if num is None: + return _helper_error( + start_calls=start_calls, + source=f"/api/{rt}s/{rid}/discussions/", + error="discussion_num must be an integer", + ) + + endpoint = f"/api/{rt}s/{rid}/discussions/{num}" + try: + detail = _host_hf_call( + endpoint, + lambda: _get_hf_api_client().get_discussion_details( + repo_id=rid, + discussion_num=int(num), + repo_type=rt, + ), + ) + except Exception as e: + return _helper_error(start_calls=start_calls, source=endpoint, error=e) + + comment_events: list[dict[str, Any]] = [] + raw_events = getattr(detail, "events", None) + if isinstance(raw_events, list): + for event in raw_events: + if str(getattr(event, "type", "")).strip().lower() != "comment": + continue + comment_events.append( + { + "author": getattr(event, "author", None), + "createdAt": _dt_to_str(getattr(event, "created_at", None)), + "text": getattr(event, "content", None), + "rendered": getattr(event, "rendered", None), + } + ) + + latest_comment: dict[str, Any] | None = None + if comment_events: + latest_comment = max(comment_events, key=lambda row: str(row.get("createdAt") or "")) + + item: dict[str, Any] = { + "num": num, + "number": num, + "discussionNum": num, + "id": num, + "repo_id": rid, + "repo_type": rt, + "title": getattr(detail, "title", None), + "author": getattr(detail, "author", None), + "createdAt": _dt_to_str(getattr(detail, "created_at", None)), + "status": getattr(detail, "status", None), + "url": getattr(detail, "url", None), + "commentCount": len(comment_events), + "latestCommentAuthor": latest_comment.get("author") if latest_comment else None, + "latestCommentCreatedAt": latest_comment.get("createdAt") if latest_comment else None, + "latestCommentText": latest_comment.get("text") if latest_comment else None, + "latestCommentHtml": latest_comment.get("rendered") if latest_comment else None, + "latest_comment_author": latest_comment.get("author") if latest_comment else None, + "latest_comment_created_at": latest_comment.get("createdAt") if latest_comment else None, + "latest_comment_text": latest_comment.get("text") if latest_comment else None, + "latest_comment_html": latest_comment.get("rendered") if latest_comment else None, + } + + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=[item], + scanned=len(comment_events), + matched=1, + returned=1, + truncated=False, + total_comments=len(comment_events), + ) + + def _resolve_repo_detail_row( + api: HfApi, + repo_id: str, + attempt_types: list[str], + ) -> tuple[dict[str, Any] | None, dict[str, Any] | None]: + rid = str(repo_id or "").strip() + if "/" not in rid: + return None, {"repo_id": rid, "error": "repo_id must be owner/name"} + + resolved_type: str | None = None + detail: Any = None + last_endpoint = "/api/repos" + errors: list[str] = [] + + for rt in attempt_types: + endpoint = f"/api/{rt}s/{rid}" + last_endpoint = endpoint + try: + detail = _host_hf_call( + endpoint, + lambda rt=rt, rid=rid: api.model_info(rid) + if rt == "model" + else api.dataset_info(rid) + if rt == "dataset" + else api.space_info(rid), + ) + resolved_type = rt + break + except Exception as e: + errors.append(f"{rt}: {str(e)}") + + if resolved_type is None or detail is None: + return None, { + "repo_id": rid, + "error": "; ".join(errors[:3]) if errors else "repo lookup failed", + "attempted_repo_types": list(attempt_types), + "source": last_endpoint, + } + + return _normalize_repo_detail_row(detail, resolved_type, rid), None + + async def hf_repo_details( + repo_id: str | None = None, + repo_ids: list[str] | None = None, + repo_type: str = "auto", + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + + if repo_id is not None and repo_ids is not None: + return _helper_error( + start_calls=start_calls, + source="/api/repos", + error="Pass either repo_id or repo_ids, not both", + ) + + requested_ids = [str(repo_id).strip()] if isinstance(repo_id, str) and str(repo_id).strip() else [] + if repo_ids is not None: + requested_ids = _coerce_str_list(repo_ids) + + if not requested_ids: + return _helper_error(start_calls=start_calls, source="/api/repos", error="repo_id or repo_ids is required") + + raw_type = str(repo_type or "auto").strip().lower() + if raw_type in {"", "auto"}: + base_attempt_types = ["model", "dataset", "space"] + else: + canonical_type = _canonical_repo_type(raw_type, default="") + if canonical_type not in {"model", "dataset", "space"}: + return _helper_error( + start_calls=start_calls, + source="/api/repos", + error=f"Unsupported repo_type '{repo_type}'", + ) + base_attempt_types = [canonical_type] + + api = _get_hf_api_client() + items: list[dict[str, Any]] = [] + failures: list[dict[str, Any]] = [] + + for rid in requested_ids: + row, failure = _resolve_repo_detail_row(api, rid, base_attempt_types) + if row is None: + if failure is not None: + failures.append(failure) + continue + items.append(row) + + if not items: + summary = failures[0]["error"] if failures else "repo lookup failed" + return _helper_error( + start_calls=start_calls, + source="/api/repos", + error=summary, + failures=failures, + repo_type=repo_type, + ) + + items = _project_repo_items(items, fields) + return _helper_success( + start_calls=start_calls, + source="/api/repos", + items=items, + repo_type=repo_type, + requested_repo_ids=requested_ids, + failures=failures or None, + matched=len(items), + returned=len(items), + ) + + async def hf_trending( + repo_type: str = "model", + limit: int = 20, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + default_return = _policy_int("hf_trending", "default_return", 20) + max_return = _policy_int("hf_trending", "max_return", TRENDING_ENDPOINT_MAX_LIMIT) + + raw_type = str(repo_type or "model").strip().lower() + if raw_type == "all": + requested_type = "all" + else: + requested_type = _canonical_repo_type(raw_type, default="") + if requested_type not in {"model", "dataset", "space"}: + return _helper_error( + start_calls=start_calls, + source="/api/trending", + error=f"Unsupported repo_type '{repo_type}'", + ) + lim = _clamp_int(limit, default=default_return, minimum=1, maximum=max_return) + + resp = _host_raw_call("/api/trending", params={"type": requested_type, "limit": lim}) + if not resp.get("ok"): + return _helper_error(start_calls=start_calls, source="/api/trending", error=resp.get("error") or "trending fetch failed") + + payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} + rows = payload.get("recentlyTrending") if isinstance(payload.get("recentlyTrending"), list) else [] + + items: list[dict[str, Any]] = [] + default_row_type = requested_type if requested_type != "all" else "model" + for idx, row in enumerate(rows[:lim], start=1): + if not isinstance(row, dict): + continue + repo = row.get("repoData") if isinstance(row.get("repoData"), dict) else {} + items.append(_normalize_trending_row(repo, default_row_type, rank=idx)) + + api = _get_hf_api_client() + enriched_items: list[dict[str, Any]] = [] + enrichment_failures: list[dict[str, Any]] = [] + for item in items: + repo_id = item.get("repo_id") + if not isinstance(repo_id, str) or not repo_id: + enriched_items.append(item) + continue + + item_repo_type = item.get("repo_type") + if isinstance(item_repo_type, str) and item_repo_type in {"model", "dataset", "space"}: + attempt_types = [item_repo_type] + else: + attempt_types = ["model", "dataset", "space"] + + detail_row, failure = _resolve_repo_detail_row(api, repo_id, attempt_types) + if detail_row is None: + enriched_items.append(item) + if failure is not None: + enrichment_failures.append(failure) + continue + + merged = dict(detail_row) + trending_score = item.get("trending_score") + if trending_score is not None: + merged["trending_score"] = trending_score + if item.get("trending_rank") is not None: + merged["trending_rank"] = item.get("trending_rank") + enriched_items.append(merged) + + items = enriched_items + items = _apply_where(items, where) + matched = len(items) + items = _project_repo_items(items[:lim], fields) + + return _helper_success( + start_calls=start_calls, + source="/api/trending", + items=items, + repo_type=requested_type, + limit=lim, + scanned=len(rows), + matched=matched, + returned=len(items), + trending_score_available=any(item.get("trending_score") is not None for item in items), + ordered_ranking=True, + failures=enrichment_failures or None, + ) + + async def hf_collections_search( + query: str | None = None, + owner: str | None = None, + return_limit: int = 20, + count_only: bool = False, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + default_return = _policy_int("hf_collections_search", "default_return", 20) + max_return = _policy_int("hf_collections_search", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT) + + if count_only: + return_limit = 0 + + lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return) + owner_clean = str(owner or "").strip() or None + fetch_lim = max_return if lim == 0 or owner_clean else lim + if owner_clean: + fetch_lim = min(fetch_lim, 100) + + term = str(query or "").strip() + if not term and owner_clean: + term = owner_clean + if not term: + return _helper_error(start_calls=start_calls, source="/api/collections", error="query or owner is required") + + params: dict[str, Any] = {"limit": fetch_lim} + if term: + params["q"] = term + if owner_clean: + params["owner"] = owner_clean + + resp = _host_raw_call("/api/collections", params=params) + if not resp.get("ok"): + return _helper_error( + start_calls=start_calls, + source="/api/collections", + error=resp.get("error") or "collections fetch failed", + ) + + payload = resp.get("data") if isinstance(resp.get("data"), list) else [] + items: list[dict[str, Any]] = [] + for row in payload[:fetch_lim]: + if not isinstance(row, dict): + continue + owner = _author_from_any(row.get("owner")) or _author_from_any(row.get("ownerData")) + if not owner and isinstance(row.get("slug"), str) and "/" in str(row.get("slug")): + owner = str(row.get("slug")).split("/", 1)[0] + if owner_clean is not None and owner != owner_clean: + continue + + owner_payload = row.get("owner") if isinstance(row.get("owner"), dict) else {} + collection_items = row.get("items") if isinstance(row.get("items"), list) else [] + slug = row.get("slug") + items.append( + { + "collection_id": slug, + "slug": slug, + "title": row.get("title"), + "owner": owner, + "owner_type": owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None, + "description": row.get("description"), + "gating": row.get("gating"), + "last_updated": row.get("lastUpdated"), + "item_count": len(collection_items), + } + ) + + items = _apply_where(items, where) + total_matched = len(items) + items = items[:lim] + items = _project_collection_items(items, fields) + truncated = (lim > 0 and total_matched > lim) or (lim == 0 and len(payload) >= fetch_lim) + + return _helper_success( + start_calls=start_calls, + source="/api/collections", + items=items, + scanned=len(payload), + matched=total_matched, + returned=len(items), + total=len(payload), + total_matched=total_matched, + total_population=len(payload), + truncated=truncated, + complete=not truncated, + query=term, + owner=owner_clean, + ) + + async def hf_collection_items( + collection_id: str, + repo_types: list[str] | None = None, + return_limit: int = 100, + count_only: bool = False, + where: dict[str, Any] | None = None, + fields: list[str] | None = None, + ) -> dict[str, Any]: + start_calls = call_count["n"] + default_return = _policy_int("hf_collection_items", "default_return", 100) + max_return = _policy_int("hf_collection_items", "max_return", OUTPUT_ITEMS_TRUNCATION_LIMIT) + + cid = str(collection_id or "").strip() + if not cid: + return _helper_error( + start_calls=start_calls, + source="/api/collections/", + error="collection_id is required", + ) + + if count_only: + return_limit = 0 + + lim = _clamp_int(return_limit, default=default_return, minimum=0, maximum=max_return) + + allowed_repo_types: set[str] | None = None + try: + raw_repo_types = _coerce_str_list(repo_types) if repo_types is not None else [] + except ValueError as e: + return _helper_error(start_calls=start_calls, source=f"/api/collections/{cid}", error=e, collection_id=cid) + if raw_repo_types: + allowed_repo_types = set() + for raw in raw_repo_types: + canonical = _canonical_repo_type(raw, default="") + if canonical not in {"model", "dataset", "space"}: + return _helper_error( + start_calls=start_calls, + source=f"/api/collections/{cid}", + error=f"Unsupported repo_type '{raw}'", + collection_id=cid, + ) + allowed_repo_types.add(canonical) + + endpoint = f"/api/collections/{cid}" + resp = _host_raw_call(endpoint) + if not resp.get("ok"): + return _helper_error( + start_calls=start_calls, + source=endpoint, + error=resp.get("error") or "collection fetch failed", + collection_id=cid, + ) + + payload = resp.get("data") if isinstance(resp.get("data"), dict) else {} + raw_items = payload.get("items") if isinstance(payload.get("items"), list) else [] + owner = _author_from_any(payload.get("owner")) + owner_payload = payload.get("owner") if isinstance(payload.get("owner"), dict) else {} + if owner is None and "/" in cid: + owner = cid.split("/", 1)[0] + + normalized: list[dict[str, Any]] = [] + for row in raw_items: + if not isinstance(row, dict): + continue + item = _normalize_collection_repo_item(row) + if item is None: + continue + repo_type = item.get("repo_type") + if allowed_repo_types is not None and repo_type not in allowed_repo_types: + continue + if not _item_matches_where(item, where): + continue + normalized.append(item) + + total_matched = len(normalized) + items = [] if count_only else normalized[:lim] + items = _project_repo_items(items, fields) + truncated = lim > 0 and total_matched > lim + + return _helper_success( + start_calls=start_calls, + source=endpoint, + items=items, + scanned=len(raw_items), + matched=total_matched, + returned=len(items), + total=len(raw_items), + total_matched=total_matched, + total_population=len(raw_items), + truncated=truncated, + complete=not truncated, + collection_id=cid, + title=payload.get("title"), + owner=owner, + owner_type=owner_payload.get("type") if isinstance(owner_payload.get("type"), str) else None, + repo_types=sorted(allowed_repo_types) if allowed_repo_types is not None else None, + ) + + async def hf_runtime_capabilities(section: str | None = None) -> dict[str, Any]: + start_calls = call_count["n"] + internal_helper_used["used"] = True + + def _render_annotation(annotation: Any) -> str: + if annotation is inspect.Signature.empty: + return "Any" + return str(annotation) + + def _render_default(default: Any) -> str | None: + if default is inspect.Signature.empty: + return None + return repr(default) + + def _signature_payload(fn: Callable[..., Any]) -> dict[str, Any]: + signature = inspect.signature(fn) + parameters: list[dict[str, Any]] = [] + for parameter in signature.parameters.values(): + item: dict[str, Any] = { + "name": parameter.name, + "kind": str(parameter.kind).replace("Parameter.", "").lower(), + "annotation": _render_annotation(parameter.annotation), + "required": parameter.default is inspect.Signature.empty, + } + default = _render_default(parameter.default) + if default is not None: + item["default"] = default + parameters.append(item) + return { + "parameters": parameters, + "returns": _render_annotation(signature.return_annotation), + } + + helper_payload = { + name: _signature_payload(fn) + for name, fn in sorted(helper_functions.items()) + } + + manifest: dict[str, Any] = { + "overview": { + "helper_count": len(helper_functions), + "supports_current_user": True, + "supports_raw_api_fallback": True, + "helper_result_envelope": { + "ok": "bool", + "item": "dict | None", + "items": "list[dict]", + "meta": "dict", + "error": "str | None", + }, + "raw_result_envelope": { + "result": "Any", + "meta": { + "ok": "bool", + "api_calls": "int", + "elapsed_ms": "int", + "limits_reached": "bool", + "limit_summary": "list[dict]", + }, + }, + }, + "helpers": helper_payload, + "fields": { + "profile": list(PROFILE_CANONICAL_FIELDS), + "repo": list(REPO_CANONICAL_FIELDS), + "user": list(USER_CANONICAL_FIELDS), + "actor": list(ACTOR_CANONICAL_FIELDS), + "activity": list(ACTIVITY_CANONICAL_FIELDS), + "collection": list(COLLECTION_CANONICAL_FIELDS), + }, + "aliases": { + "repo": dict(sorted(_REPO_FIELD_ALIASES.items())), + "user": dict(sorted(_USER_FIELD_ALIASES.items())), + "actor": dict(sorted(_ACTOR_FIELD_ALIASES.items())), + "collection": dict(sorted(_COLLECTION_FIELD_ALIASES.items())), + "sort_keys": dict(sorted(_SORT_KEY_ALIASES.items())), + }, + "limits": { + "default_timeout_sec": DEFAULT_TIMEOUT_SEC, + "default_max_calls": DEFAULT_MAX_CALLS, + "max_calls_limit": MAX_CALLS_LIMIT, + "output_items_truncation_limit": OUTPUT_ITEMS_TRUNCATION_LIMIT, + "graph_scan_limit_cap": GRAPH_SCAN_LIMIT_CAP, + "likes_scan_limit_cap": LIKES_SCAN_LIMIT_CAP, + "recent_activity_scan_max_pages": RECENT_ACTIVITY_SCAN_MAX_PAGES, + "trending_endpoint_max_limit": TRENDING_ENDPOINT_MAX_LIMIT, + "pagination_policy": { + helper_name: dict(sorted(policy.items())) + for helper_name, policy in sorted(PAGINATION_POLICY.items()) + }, + }, + "raw_api": { + "call_api": _signature_payload(call_api), + "allowed_methods": ["GET", "POST"], + "allowed_endpoint_patterns": list(ALLOWLIST_PATTERNS), + "helper_covered_endpoint_patterns": [ + {"pattern": pattern, "helper": helper_name} + for pattern, helper_name in HELPER_COVERED_ENDPOINT_PATTERNS + ], + }, + "repo_search": { + "sort_keys": { + repo_type: sorted(keys) + for repo_type, keys in sorted(_REPO_SORT_KEYS.items()) + }, + "extra_args": { + repo_type: sorted(args) + for repo_type, args in sorted(_REPO_SEARCH_EXTRA_ARGS.items()) + }, + }, + } + + allowed_sections = sorted(manifest) + requested = str(section or "").strip().lower() + if requested: + if requested not in manifest: + return _helper_error( + start_calls=start_calls, + source="internal://runtime-capabilities", + error=f"Unsupported section {section!r}. Allowed sections: {allowed_sections}", + section=section, + allowed_sections=allowed_sections, + ) + payload = { + "section": requested, + "content": manifest[requested], + "allowed_sections": allowed_sections, + } + else: + payload = { + "allowed_sections": allowed_sections, + **manifest, + } + + return _helper_success( + start_calls=start_calls, + source="internal://runtime-capabilities", + items=[payload], + section=requested or None, + ) + + m = pydantic_monty.Monty( + code, + inputs=["query", "max_calls"], + script_name="monty_agent.py", + type_check=False, + ) + + def _collecting_wrapper(helper_name: str, fn: Callable[..., Any]) -> Callable[..., Any]: + async def wrapped(*args: Any, **kwargs: Any) -> Any: + result = await fn(*args, **kwargs) + summary = _summarize_limit_hit(helper_name, result) + if summary is not None and len(limit_summaries) < 20: + limit_summaries.append(summary) + return result + + return wrapped + + limits: pydantic_monty.ResourceLimits = { + "max_duration_secs": float(timeout_sec), + "max_memory": DEFAULT_MONTY_MAX_MEMORY, + "max_allocations": DEFAULT_MONTY_MAX_ALLOCATIONS, + "max_recursion_depth": DEFAULT_MONTY_MAX_RECURSION_DEPTH, + } + + helper_functions = _resolve_helper_functions(locals()) + + try: + result = await pydantic_monty.run_monty_async( + m, + inputs={"query": query, "max_calls": max_calls}, + external_functions={ + "call_api": call_api, + **{name: _collecting_wrapper(name, fn) for name, fn in helper_functions.items()}, + }, + limits=limits, + ) + except Exception as e: + raise MontyExecutionError(str(e), call_count["n"], trace) from e + + if call_count["n"] == 0: + # Some current-user helpers can fail before any live API call is made + # (for example when request-scoped auth is unavailable). If generated + # code either returns that explicit helper error envelope or flattens it + # into an empty fallback shape, preserve the helper-owned error instead + # of replacing it with a generic zero-call runtime failure. + if internal_helper_used["used"]: + return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} + if isinstance(result, dict) and result.get("ok") is True: + meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} + source = meta.get("source") + if isinstance(source, str) and source.startswith("internal://"): + return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} + if latest_helper_error is not None: + return {"output": _truncate_result_payload(latest_helper_error), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} + if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str): + return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} + raise MontyExecutionError("Code completed without calling any external API function", call_count["n"], trace) + + if not any(step.get("ok") is True for step in trace): + # Allow explicit helper/live failure envelopes to be returned as-is. + # This preserves concrete API error context (e.g. repo not found) while + # still blocking fabricated successful fallback outputs. + if isinstance(result, dict) and result.get("ok") is False and isinstance(result.get("error"), str): + return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} + raise MontyExecutionError( + "Code completed without a successful API call; refusing non-live fallback result", + call_count["n"], + trace, + ) + + return {"output": _truncate_result_payload(result), "api_calls": call_count["n"], "trace": trace, "limit_summaries": limit_summaries} + + +async def hf_hub_query( + query: str, + code: str, + max_calls: int = DEFAULT_MAX_CALLS, + timeout_sec: int = DEFAULT_TIMEOUT_SEC, +) -> dict[str, Any]: + """Use natural-language queries to explore the Hugging Face Hub. + + Best for read-only Hub discovery, lookup, ranking, and relationship questions + across users, organizations, repositories, activity, followers, likes, + discussions, and collections. + """ + if not query or not query.strip(): + raise ValueError("query is required") + if not code or not code.strip(): + raise ValueError("code is required") + + max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) + code = code.strip() + try: + _validate_generated_code(code) + + run = await _run_with_monty( + code=code, + query=query, + max_calls=max_calls, + strict_mode=INTERNAL_STRICT_MODE, + timeout_sec=timeout_sec, + ) + return { + "ok": True, + "data": run["output"], + "error": None, + "api_calls": run["api_calls"], + } + except MontyExecutionError as e: + return { + "ok": False, + "data": None, + "error": str(e), + "api_calls": e.api_calls, + } + except Exception as e: + return { + "ok": False, + "data": None, + "error": str(e), + "api_calls": 0, + } + + +async def hf_hub_query_raw( + query: str, + code: str, + max_calls: int = DEFAULT_MAX_CALLS, + timeout_sec: int = DEFAULT_TIMEOUT_SEC, +) -> Any: + """Use natural-language queries to explore the Hugging Face Hub in raw mode. + + Best for read-only Hub discovery, lookup, ranking, and relationship + questions when the caller wants a runtime-owned raw envelope: + ``result`` contains the direct ``solve(...)`` output and ``meta`` contains + execution details such as timing, call counts, and limit summaries. + """ + if not query or not query.strip(): + raise ValueError("query is required") + if not code or not code.strip(): + raise ValueError("code is required") + + max_calls = max(1, min(int(max_calls), MAX_CALLS_LIMIT)) + code = code.strip() + started = time.perf_counter() + try: + _validate_generated_code(code) + + run = await _run_with_monty( + code=code, + query=query, + max_calls=max_calls, + strict_mode=INTERNAL_STRICT_MODE, + timeout_sec=timeout_sec, + ) + elapsed_ms = int((time.perf_counter() - started) * 1000) + return _wrap_raw_result( + run["output"], + ok=True, + api_calls=run["api_calls"], + elapsed_ms=elapsed_ms, + limit_summaries=run.get("limit_summaries"), + ) + except MontyExecutionError as e: + elapsed_ms = int((time.perf_counter() - started) * 1000) + return _wrap_raw_result( + None, + ok=False, + api_calls=e.api_calls, + elapsed_ms=elapsed_ms, + error=str(e), + ) + except Exception as e: + elapsed_ms = int((time.perf_counter() - started) * 1000) + return _wrap_raw_result( + None, + ok=False, + api_calls=0, + elapsed_ms=elapsed_ms, + error=str(e), + ) + +def _arg_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="Monty-backed API chaining tool (v2)") + p.add_argument("--query", required=True, help="Natural language query") + p.add_argument("--code", default=None, help="Inline Monty code to execute") + p.add_argument("--code-file", default=None, help="Path to .py file with Monty code to execute") + p.add_argument("--max-calls", type=int, default=DEFAULT_MAX_CALLS, help="Max external API/helper calls") + p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SEC) + return p + + +def main() -> int: + args = _arg_parser().parse_args() + code = args.code + if args.code_file: + with open(args.code_file, "r", encoding="utf-8") as f: + code = f.read() + + if not code: + print(json.dumps({"ok": False, "error": "Either --code or --code-file is required"}, ensure_ascii=False)) + return 1 + + try: + out = asyncio.run( + hf_hub_query( + query=args.query, + code=code, + max_calls=args.max_calls, + timeout_sec=args.timeout, + ) + ) + print(json.dumps(out, ensure_ascii=False)) + return 0 if out.get("ok") else 1 + except Exception as e: + print(json.dumps({"ok": False, "error": str(e)}, ensure_ascii=False)) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main())