from __future__ import annotations from pathlib import Path, PurePosixPath from typing import Iterable from huggingface_hub import CommitOperationAdd, HfApi try: from .validator import DOMAIN_TOKEN_RE, PreparedSubmission, TASK_ID_RE, normalize_domain_token except ImportError: from validator import DOMAIN_TOKEN_RE, PreparedSubmission, TASK_ID_RE, normalize_domain_token DEFAULT_REPO_ID = 'InternScience/ResearchClawBench' TOKEN_ENV_KEYS = ( 'RCB_SPACE_HF_TOKEN', 'HF_TOKEN', 'HUGGINGFACEHUB_API_TOKEN', 'HUGGINGFACE_TOKEN', ) def load_hf_token() -> str | None: import os for key in TOKEN_ENV_KEYS: value = os.environ.get(key) if value: return value return None def list_existing_task_ids(repo_id: str = DEFAULT_REPO_ID, token: str | None = None) -> set[str]: api = HfApi(token=token) task_ids: set[str] = set() for remote_path in api.list_repo_files(repo_id=repo_id, repo_type='dataset', token=token): parts = PurePosixPath(remote_path).parts if len(parts) >= 2 and parts[0] == 'tasks': task_ids.add(parts[1]) return task_ids def get_repo_head_sha(repo_id: str = DEFAULT_REPO_ID, token: str | None = None) -> str: api = HfApi(token=token) info = api.repo_info(repo_id=repo_id, repo_type='dataset', revision='main', token=token) if not getattr(info, 'sha', None): raise RuntimeError(f'Failed to fetch HEAD SHA for dataset repo {repo_id}.') return info.sha def allocate_next_task_id(domain: str, existing_task_ids: Iterable[str]) -> str: domain = normalize_domain_token(domain) if not DOMAIN_TOKEN_RE.fullmatch(domain): raise ValueError( 'Domain must start with a letter and contain only letters, numbers, or hyphens ' f'after normalization. Got: {domain!r}' ) used_numbers = [] for task_id in existing_task_ids: match = TASK_ID_RE.match(task_id) if match and match.group(1) == domain: used_numbers.append(int(match.group(2))) next_number = (max(used_numbers) + 1) if used_numbers else 0 if next_number > 999: raise ValueError(f'No task IDs left for domain {domain}.') return f'{domain}_{next_number:03d}' def build_commit_description(prepared: PreparedSubmission) -> str: metadata = prepared.metadata lines = [ f'Submitter: {metadata.submitter}', f'Contact email: {metadata.email}', f'Domain: {metadata.domain}', f'Assigned task id: {prepared.assigned_task_id}', f'Paper title: {metadata.paper_title}', f'Paper URL/DOI: {metadata.paper_url}', f'Archive files: {prepared.archive_stats.file_count}', f'Archive total bytes: {prepared.archive_stats.total_bytes}', ] if metadata.notes.strip(): lines.extend(['', 'Submitter notes:', metadata.notes.strip()]) lines.extend(['', 'This PR was created automatically by the ResearchClawBench submission Space after passing format validation.']) return '\n'.join(lines) def create_dataset_pr( prepared: PreparedSubmission, *, repo_id: str = DEFAULT_REPO_ID, token: str | None = None, parent_commit: str | None = None, ): token = token or load_hf_token() if not token: raise RuntimeError('No Hugging Face write token configured. Set RCB_SPACE_HF_TOKEN or HF_TOKEN.') staged_task_dir = Path(prepared.staged_task_dir) if not staged_task_dir.is_dir(): raise RuntimeError(f'Staged task directory does not exist: {staged_task_dir}') operations = [] for path in sorted(staged_task_dir.rglob('*')): if not path.is_file(): continue rel_path = path.relative_to(staged_task_dir).as_posix() operations.append( CommitOperationAdd( path_in_repo=f'tasks/{prepared.assigned_task_id}/{rel_path}', path_or_fileobj=str(path), ) ) api = HfApi(token=token) return api.create_commit( repo_id=repo_id, repo_type='dataset', operations=operations, commit_message=f'Add task submission {prepared.assigned_task_id}', commit_description=build_commit_description(prepared), token=token, create_pr=True, revision='main', parent_commit=parent_commit, )