| """Flow API Client for VideoFX (Veo)""" |
| import time |
| import uuid |
| import random |
| import base64 |
| from typing import Dict, Any, Optional, List |
| from curl_cffi.requests import AsyncSession |
| from ..core.logger import debug_logger |
| from ..core.config import config |
|
|
|
|
| class FlowClient: |
| """VideoFX API客户端""" |
|
|
| def __init__(self, proxy_manager, db=None): |
| self.proxy_manager = proxy_manager |
| self.db = db |
| self.labs_base_url = config.flow_labs_base_url |
| self.api_base_url = config.flow_api_base_url |
| self.timeout = config.flow_timeout |
| |
| self._user_agent_cache = {} |
|
|
| def _generate_user_agent(self, account_id: str = None) -> str: |
| """基于账号ID生成固定的 User-Agent |
| |
| Args: |
| account_id: 账号标识(如 email 或 token_id),相同账号返回相同 UA |
| |
| Returns: |
| User-Agent 字符串 |
| """ |
| |
| if not account_id: |
| account_id = f"random_{random.randint(1, 999999)}" |
| |
| |
| if account_id in self._user_agent_cache: |
| return self._user_agent_cache[account_id] |
| |
| |
| import hashlib |
| seed = int(hashlib.md5(account_id.encode()).hexdigest()[:8], 16) |
| rng = random.Random(seed) |
| |
| |
| chrome_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0", "129.0.0.0"] |
| |
| firefox_versions = ["133.0", "132.0", "131.0", "134.0"] |
| |
| safari_versions = ["18.2", "18.1", "18.0", "17.6"] |
| |
| edge_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0"] |
|
|
| |
| os_configs = [ |
| |
| { |
| "platform": "Windows NT 10.0; Win64; x64", |
| "browsers": [ |
| lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36", |
| lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", |
| lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36 Edg/{r.choice(edge_versions)}", |
| ] |
| }, |
| |
| { |
| "platform": "Macintosh; Intel Mac OS X 10_15_7", |
| "browsers": [ |
| lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36", |
| lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/{r.choice(safari_versions)} Safari/605.1.15", |
| lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.{r.randint(0, 7)}; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", |
| ] |
| }, |
| |
| { |
| "platform": "X11; Linux x86_64", |
| "browsers": [ |
| lambda r: f"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36", |
| lambda r: f"Mozilla/5.0 (X11; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", |
| lambda r: f"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}", |
| ] |
| } |
| ] |
|
|
| |
| os_config = rng.choice(os_configs) |
| browser_generator = rng.choice(os_config["browsers"]) |
| user_agent = browser_generator(rng) |
| |
| |
| self._user_agent_cache[account_id] = user_agent |
| |
| return user_agent |
|
|
| async def _make_request( |
| self, |
| method: str, |
| url: str, |
| headers: Optional[Dict] = None, |
| json_data: Optional[Dict] = None, |
| use_st: bool = False, |
| st_token: Optional[str] = None, |
| use_at: bool = False, |
| at_token: Optional[str] = None |
| ) -> Dict[str, Any]: |
| """统一HTTP请求处理 |
| |
| Args: |
| method: HTTP方法 (GET/POST) |
| url: 完整URL |
| headers: 请求头 |
| json_data: JSON请求体 |
| use_st: 是否使用ST认证 (Cookie方式) |
| st_token: Session Token |
| use_at: 是否使用AT认证 (Bearer方式) |
| at_token: Access Token |
| """ |
| proxy_url = await self.proxy_manager.get_proxy_url() |
|
|
| if headers is None: |
| headers = {} |
|
|
| |
| if use_st and st_token: |
| headers["Cookie"] = f"__Secure-next-auth.session-token={st_token}" |
|
|
| |
| if use_at and at_token: |
| headers["authorization"] = f"Bearer {at_token}" |
|
|
| |
| account_id = None |
| if st_token: |
| account_id = st_token[:16] |
| elif at_token: |
| account_id = at_token[:16] |
|
|
| |
| headers.update({ |
| "Content-Type": "application/json", |
| "User-Agent": self._generate_user_agent(account_id) |
| }) |
|
|
| |
| if config.debug_enabled: |
| debug_logger.log_request( |
| method=method, |
| url=url, |
| headers=headers, |
| body=json_data, |
| proxy=proxy_url |
| ) |
|
|
| start_time = time.time() |
|
|
| try: |
| async with AsyncSession() as session: |
| if method.upper() == "GET": |
| response = await session.get( |
| url, |
| headers=headers, |
| proxy=proxy_url, |
| timeout=self.timeout, |
| impersonate="chrome110" |
| ) |
| else: |
| response = await session.post( |
| url, |
| headers=headers, |
| json=json_data, |
| proxy=proxy_url, |
| timeout=self.timeout, |
| impersonate="chrome110" |
| ) |
|
|
| duration_ms = (time.time() - start_time) * 1000 |
|
|
| |
| if config.debug_enabled: |
| debug_logger.log_response( |
| status_code=response.status_code, |
| headers=dict(response.headers), |
| body=response.text, |
| duration_ms=duration_ms |
| ) |
|
|
| response.raise_for_status() |
| return response.json() |
|
|
| except Exception as e: |
| duration_ms = (time.time() - start_time) * 1000 |
| error_msg = str(e) |
|
|
| if config.debug_enabled: |
| debug_logger.log_error( |
| error_message=error_msg, |
| status_code=getattr(e, 'status_code', None), |
| response_text=getattr(e, 'response_text', None) |
| ) |
|
|
| raise Exception(f"Flow API request failed: {error_msg}") |
|
|
| |
|
|
| async def st_to_at(self, st: str) -> dict: |
| """ST转AT |
| |
| Args: |
| st: Session Token |
| |
| Returns: |
| { |
| "access_token": "AT", |
| "expires": "2025-11-15T04:46:04.000Z", |
| "user": {...} |
| } |
| """ |
| url = f"{self.labs_base_url}/auth/session" |
| result = await self._make_request( |
| method="GET", |
| url=url, |
| use_st=True, |
| st_token=st |
| ) |
| return result |
|
|
| |
|
|
| async def create_project(self, st: str, title: str) -> str: |
| """创建项目,返回project_id |
| |
| Args: |
| st: Session Token |
| title: 项目标题 |
| |
| Returns: |
| project_id (UUID) |
| """ |
| url = f"{self.labs_base_url}/trpc/project.createProject" |
| json_data = { |
| "json": { |
| "projectTitle": title, |
| "toolName": "PINHOLE" |
| } |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_st=True, |
| st_token=st |
| ) |
|
|
| |
| project_id = result["result"]["data"]["json"]["result"]["projectId"] |
| return project_id |
|
|
| async def delete_project(self, st: str, project_id: str): |
| """删除项目 |
| |
| Args: |
| st: Session Token |
| project_id: 项目ID |
| """ |
| url = f"{self.labs_base_url}/trpc/project.deleteProject" |
| json_data = { |
| "json": { |
| "projectToDeleteId": project_id |
| } |
| } |
|
|
| await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_st=True, |
| st_token=st |
| ) |
|
|
| |
|
|
| async def get_credits(self, at: str) -> dict: |
| """查询余额 |
| |
| Args: |
| at: Access Token |
| |
| Returns: |
| { |
| "credits": 920, |
| "userPaygateTier": "PAYGATE_TIER_ONE" |
| } |
| """ |
| url = f"{self.api_base_url}/credits" |
| result = await self._make_request( |
| method="GET", |
| url=url, |
| use_at=True, |
| at_token=at |
| ) |
| return result |
|
|
| |
|
|
| async def upload_image( |
| self, |
| at: str, |
| image_bytes: bytes, |
| aspect_ratio: str = "IMAGE_ASPECT_RATIO_LANDSCAPE" |
| ) -> str: |
| """上传图片,返回mediaGenerationId |
| |
| Args: |
| at: Access Token |
| image_bytes: 图片字节数据 |
| aspect_ratio: 图片或视频宽高比(会自动转换为图片格式) |
| |
| Returns: |
| mediaGenerationId (CAM...) |
| """ |
| |
| |
| |
| if aspect_ratio.startswith("VIDEO_"): |
| aspect_ratio = aspect_ratio.replace("VIDEO_", "IMAGE_") |
|
|
| |
| image_base64 = base64.b64encode(image_bytes).decode('utf-8') |
|
|
| url = f"{self.api_base_url}:uploadUserImage" |
| json_data = { |
| "imageInput": { |
| "rawImageBytes": image_base64, |
| "mimeType": "image/jpeg", |
| "isUserUploaded": True, |
| "aspectRatio": aspect_ratio |
| }, |
| "clientContext": { |
| "sessionId": self._generate_session_id(), |
| "tool": "ASSET_MANAGER" |
| } |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_at=True, |
| at_token=at |
| ) |
|
|
| |
| media_id = result["mediaGenerationId"]["mediaGenerationId"] |
| return media_id |
|
|
| |
|
|
| async def generate_image( |
| self, |
| at: str, |
| project_id: str, |
| prompt: str, |
| model_name: str, |
| aspect_ratio: str, |
| image_inputs: Optional[List[Dict]] = None |
| ) -> dict: |
| """生成图片(同步返回) |
| |
| Args: |
| at: Access Token |
| project_id: 项目ID |
| prompt: 提示词 |
| model_name: GEM_PIX, GEM_PIX_2 或 IMAGEN_3_5 |
| aspect_ratio: 图片宽高比 |
| image_inputs: 参考图片列表(图生图时使用) |
| |
| Returns: |
| { |
| "media": [{ |
| "image": { |
| "generatedImage": { |
| "fifeUrl": "图片URL", |
| ... |
| } |
| } |
| }] |
| } |
| """ |
| url = f"{self.api_base_url}/projects/{project_id}/flowMedia:batchGenerateImages" |
|
|
| |
| recaptcha_token = await self._get_recaptcha_token(project_id) or "" |
| session_id = self._generate_session_id() |
|
|
| |
| request_data = { |
| "clientContext": { |
| "recaptchaToken": recaptcha_token, |
| "projectId": project_id, |
| "sessionId": session_id, |
| "tool": "PINHOLE" |
| }, |
| "seed": random.randint(1, 99999), |
| "imageModelName": model_name, |
| "imageAspectRatio": aspect_ratio, |
| "prompt": prompt, |
| "imageInputs": image_inputs or [] |
| } |
|
|
| json_data = { |
| "clientContext": { |
| "recaptchaToken": recaptcha_token, |
| "sessionId": session_id |
| }, |
| "requests": [request_data] |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_at=True, |
| at_token=at |
| ) |
|
|
| return result |
|
|
| |
|
|
| async def generate_video_text( |
| self, |
| at: str, |
| project_id: str, |
| prompt: str, |
| model_key: str, |
| aspect_ratio: str, |
| user_paygate_tier: str = "PAYGATE_TIER_ONE" |
| ) -> dict: |
| """文生视频,返回task_id |
| |
| Args: |
| at: Access Token |
| project_id: 项目ID |
| prompt: 提示词 |
| model_key: veo_3_1_t2v_fast 等 |
| aspect_ratio: 视频宽高比 |
| user_paygate_tier: 用户等级 |
| |
| Returns: |
| { |
| "operations": [{ |
| "operation": {"name": "task_id"}, |
| "sceneId": "uuid", |
| "status": "MEDIA_GENERATION_STATUS_PENDING" |
| }], |
| "remainingCredits": 900 |
| } |
| """ |
| url = f"{self.api_base_url}/video:batchAsyncGenerateVideoText" |
|
|
| |
| recaptcha_token = await self._get_recaptcha_token(project_id) or "" |
| session_id = self._generate_session_id() |
| scene_id = str(uuid.uuid4()) |
|
|
| json_data = { |
| "clientContext": { |
| "recaptchaToken": recaptcha_token, |
| "sessionId": session_id, |
| "projectId": project_id, |
| "tool": "PINHOLE", |
| "userPaygateTier": user_paygate_tier |
| }, |
| "requests": [{ |
| "aspectRatio": aspect_ratio, |
| "seed": random.randint(1, 99999), |
| "textInput": { |
| "prompt": prompt |
| }, |
| "videoModelKey": model_key, |
| "metadata": { |
| "sceneId": scene_id |
| } |
| }] |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_at=True, |
| at_token=at |
| ) |
|
|
| return result |
|
|
| async def generate_video_reference_images( |
| self, |
| at: str, |
| project_id: str, |
| prompt: str, |
| model_key: str, |
| aspect_ratio: str, |
| reference_images: List[Dict], |
| user_paygate_tier: str = "PAYGATE_TIER_ONE" |
| ) -> dict: |
| """图生视频,返回task_id |
| |
| Args: |
| at: Access Token |
| project_id: 项目ID |
| prompt: 提示词 |
| model_key: veo_3_0_r2v_fast |
| aspect_ratio: 视频宽高比 |
| reference_images: 参考图片列表 [{"imageUsageType": "IMAGE_USAGE_TYPE_ASSET", "mediaId": "..."}] |
| user_paygate_tier: 用户等级 |
| |
| Returns: |
| 同 generate_video_text |
| """ |
| url = f"{self.api_base_url}/video:batchAsyncGenerateVideoReferenceImages" |
|
|
| |
| recaptcha_token = await self._get_recaptcha_token(project_id) or "" |
| session_id = self._generate_session_id() |
| scene_id = str(uuid.uuid4()) |
|
|
| json_data = { |
| "clientContext": { |
| "recaptchaToken": recaptcha_token, |
| "sessionId": session_id, |
| "projectId": project_id, |
| "tool": "PINHOLE", |
| "userPaygateTier": user_paygate_tier |
| }, |
| "requests": [{ |
| "aspectRatio": aspect_ratio, |
| "seed": random.randint(1, 99999), |
| "textInput": { |
| "prompt": prompt |
| }, |
| "videoModelKey": model_key, |
| "referenceImages": reference_images, |
| "metadata": { |
| "sceneId": scene_id |
| } |
| }] |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_at=True, |
| at_token=at |
| ) |
|
|
| return result |
|
|
| async def generate_video_start_end( |
| self, |
| at: str, |
| project_id: str, |
| prompt: str, |
| model_key: str, |
| aspect_ratio: str, |
| start_media_id: str, |
| end_media_id: str, |
| user_paygate_tier: str = "PAYGATE_TIER_ONE" |
| ) -> dict: |
| """收尾帧生成视频,返回task_id |
| |
| Args: |
| at: Access Token |
| project_id: 项目ID |
| prompt: 提示词 |
| model_key: veo_3_1_i2v_s_fast_fl |
| aspect_ratio: 视频宽高比 |
| start_media_id: 起始帧mediaId |
| end_media_id: 结束帧mediaId |
| user_paygate_tier: 用户等级 |
| |
| Returns: |
| 同 generate_video_text |
| """ |
| url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartAndEndImage" |
|
|
| |
| recaptcha_token = await self._get_recaptcha_token(project_id) or "" |
| session_id = self._generate_session_id() |
| scene_id = str(uuid.uuid4()) |
|
|
| json_data = { |
| "clientContext": { |
| "recaptchaToken": recaptcha_token, |
| "sessionId": session_id, |
| "projectId": project_id, |
| "tool": "PINHOLE", |
| "userPaygateTier": user_paygate_tier |
| }, |
| "requests": [{ |
| "aspectRatio": aspect_ratio, |
| "seed": random.randint(1, 99999), |
| "textInput": { |
| "prompt": prompt |
| }, |
| "videoModelKey": model_key, |
| "startImage": { |
| "mediaId": start_media_id |
| }, |
| "endImage": { |
| "mediaId": end_media_id |
| }, |
| "metadata": { |
| "sceneId": scene_id |
| } |
| }] |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_at=True, |
| at_token=at |
| ) |
|
|
| return result |
|
|
| async def generate_video_start_image( |
| self, |
| at: str, |
| project_id: str, |
| prompt: str, |
| model_key: str, |
| aspect_ratio: str, |
| start_media_id: str, |
| user_paygate_tier: str = "PAYGATE_TIER_ONE" |
| ) -> dict: |
| """仅首帧生成视频,返回task_id |
| |
| Args: |
| at: Access Token |
| project_id: 项目ID |
| prompt: 提示词 |
| model_key: veo_3_1_i2v_s_fast_fl等 |
| aspect_ratio: 视频宽高比 |
| start_media_id: 起始帧mediaId |
| user_paygate_tier: 用户等级 |
| |
| Returns: |
| 同 generate_video_text |
| """ |
| url = f"{self.api_base_url}/video:batchAsyncGenerateVideoStartImage" |
|
|
| |
| recaptcha_token = await self._get_recaptcha_token(project_id) or "" |
| session_id = self._generate_session_id() |
| scene_id = str(uuid.uuid4()) |
|
|
| json_data = { |
| "clientContext": { |
| "recaptchaToken": recaptcha_token, |
| "sessionId": session_id, |
| "projectId": project_id, |
| "tool": "PINHOLE", |
| "userPaygateTier": user_paygate_tier |
| }, |
| "requests": [{ |
| "aspectRatio": aspect_ratio, |
| "seed": random.randint(1, 99999), |
| "textInput": { |
| "prompt": prompt |
| }, |
| "videoModelKey": model_key, |
| "startImage": { |
| "mediaId": start_media_id |
| }, |
| |
| "metadata": { |
| "sceneId": scene_id |
| } |
| }] |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_at=True, |
| at_token=at |
| ) |
|
|
| return result |
|
|
| |
|
|
| async def check_video_status(self, at: str, operations: List[Dict]) -> dict: |
| """查询视频生成状态 |
| |
| Args: |
| at: Access Token |
| operations: 操作列表 [{"operation": {"name": "task_id"}, "sceneId": "...", "status": "..."}] |
| |
| Returns: |
| { |
| "operations": [{ |
| "operation": { |
| "name": "task_id", |
| "metadata": {...} # 完成时包含视频信息 |
| }, |
| "status": "MEDIA_GENERATION_STATUS_SUCCESSFUL" |
| }] |
| } |
| """ |
| url = f"{self.api_base_url}/video:batchCheckAsyncVideoGenerationStatus" |
|
|
| json_data = { |
| "operations": operations |
| } |
|
|
| result = await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_at=True, |
| at_token=at |
| ) |
|
|
| return result |
|
|
| |
|
|
| async def delete_media(self, st: str, media_names: List[str]): |
| """删除媒体 |
| |
| Args: |
| st: Session Token |
| media_names: 媒体ID列表 |
| """ |
| url = f"{self.labs_base_url}/trpc/media.deleteMedia" |
| json_data = { |
| "json": { |
| "names": media_names |
| } |
| } |
|
|
| await self._make_request( |
| method="POST", |
| url=url, |
| json_data=json_data, |
| use_st=True, |
| st_token=st |
| ) |
|
|
| |
|
|
| def _generate_session_id(self) -> str: |
| """生成sessionId: ;timestamp""" |
| return f";{int(time.time() * 1000)}" |
|
|
| def _generate_scene_id(self) -> str: |
| """生成sceneId: UUID""" |
| return str(uuid.uuid4()) |
|
|
| async def _get_recaptcha_token(self, project_id: str) -> Optional[str]: |
| """获取reCAPTCHA token - 支持多种打码方式""" |
| captcha_method = config.captcha_method |
|
|
| |
| if captcha_method == "personal": |
| try: |
| from .browser_captcha_personal import BrowserCaptchaService |
| service = await BrowserCaptchaService.get_instance(self.db) |
| return await service.get_token(project_id) |
| except Exception as e: |
| debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}") |
| return None |
| |
| elif captcha_method == "browser": |
| try: |
| from .browser_captcha import BrowserCaptchaService |
| service = await BrowserCaptchaService.get_instance(self.db) |
| return await service.get_token(project_id) |
| except Exception as e: |
| debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}") |
| return None |
| |
| elif captcha_method in ["yescaptcha", "capmonster", "ezcaptcha", "capsolver"]: |
| return await self._get_api_captcha_token(captcha_method, project_id) |
| else: |
| debug_logger.log_error(f"[reCAPTCHA] Unknown captcha method: {captcha_method}") |
| return None |
|
|
| async def _get_api_captcha_token(self, method: str, project_id: str) -> Optional[str]: |
| """通用API打码服务""" |
| |
| if method == "yescaptcha": |
| client_key = config.yescaptcha_api_key |
| base_url = config.yescaptcha_base_url |
| task_type = "RecaptchaV3TaskProxylessM1" |
| elif method == "capmonster": |
| client_key = config.capmonster_api_key |
| base_url = config.capmonster_base_url |
| task_type = "RecaptchaV3EnterpriseTask" |
| elif method == "ezcaptcha": |
| client_key = config.ezcaptcha_api_key |
| base_url = config.ezcaptcha_base_url |
| task_type = "ReCaptchaV3EnterpriseTaskProxyless" |
| elif method == "capsolver": |
| client_key = config.capsolver_api_key |
| base_url = config.capsolver_base_url |
| task_type = "ReCaptchaV3EnterpriseTaskProxyLess" |
| else: |
| debug_logger.log_error(f"[reCAPTCHA] Unknown API method: {method}") |
| return None |
|
|
| if not client_key: |
| debug_logger.log_info(f"[reCAPTCHA] {method} API key not configured, skipping") |
| return None |
|
|
| website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV" |
| website_url = f"https://labs.google/fx/tools/flow/project/{project_id}" |
| page_action = "FLOW_GENERATION" |
|
|
| try: |
| async with AsyncSession() as session: |
| create_url = f"{base_url}/createTask" |
| create_data = { |
| "clientKey": client_key, |
| "softID": "42780", |
| "task": { |
| "websiteURL": website_url, |
| "websiteKey": website_key, |
| "type": task_type, |
| "pageAction": page_action |
| } |
| } |
|
|
| result = await session.post(create_url, json=create_data, impersonate="chrome110") |
| result_json = result.json() |
| task_id = result_json.get('taskId') |
|
|
| debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}") |
|
|
| if not task_id: |
| error_desc = result_json.get('errorDescription', 'Unknown error') |
| debug_logger.log_error(f"[reCAPTCHA {method}] Failed to create task: {error_desc}") |
| return None |
|
|
| get_url = f"{base_url}/getTaskResult" |
| for i in range(40): |
| get_data = { |
| "clientKey": client_key, |
| "taskId": task_id |
| } |
| result = await session.post(get_url, json=get_data, impersonate="chrome110") |
| result_json = result.json() |
|
|
| debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}") |
|
|
| status = result_json.get('status') |
| if status == 'ready': |
| solution = result_json.get('solution', {}) |
| response = solution.get('gRecaptchaResponse') |
| if response: |
| debug_logger.log_info(f"[reCAPTCHA {method}] Token获取成功") |
| return response |
|
|
| time.sleep(3) |
|
|
| debug_logger.log_error(f"[reCAPTCHA {method}] Timeout waiting for token") |
| return None |
|
|
| except Exception as e: |
| debug_logger.log_error(f"[reCAPTCHA {method}] error: {str(e)}") |
| return None |
|
|