| |
| import socket |
| import asyncio |
| import aiofiles |
| import psutil |
| import requests |
|
|
| from os.path import join |
| from tld import get_tld |
| from loguru import logger |
| from playwright.async_api import async_playwright, Page |
| from playwright_stealth import stealth_async |
|
|
| from option_mysql import get_aiomysql_instance |
| from __init__ import ( |
| gotify_host, |
| gotify_port, |
| gotify_token, |
| gotify_img, |
| priority, |
| DIR_PATH, |
| DEBUG, |
| browser_headless, |
| browser_proxy, |
| ignore_https_errors, |
| user_agent, |
| ) |
|
|
|
|
| |
| async def init_page(storage_state=None, browser_type: str = "chromium") -> Page: |
| if browser_type not in ("chromium", "firefox", "webkit"): |
| raise TypeError("unspported browser") |
|
|
| async with async_playwright() as p: |
| if browser_type == "chromium": |
| browser = await p.chromium.launch(headless=browser_headless) |
| elif browser_type == "firefox": |
| browser = await p.firefox.launch(headless=browser_headless) |
| else: |
| logger.error("初始化浏览器失败") |
| exit(-1) |
|
|
| context = await browser.new_context( |
| proxy=browser_proxy, |
| ignore_https_errors=ignore_https_errors, |
| user_agent=user_agent, |
| ) |
| |
| context = await browser.new_context(storage_state=storage_state) |
| page = await context.new_page() |
| |
| await stealth_async(page) |
| |
| await goto_github(page) |
| await dump_cookie(page) |
| await browser.close() |
|
|
|
|
| async def goto_github(page): |
| await page.goto("https://github.com") |
|
|
|
|
| async def dump_cookie(page): |
| url = "https://huggingface.co/login" |
| save_cookie = join(DIR_PATH, f"{get_domain(url)}.json") |
| await page.goto(url) |
| logger.debug(DIR_PATH) |
| await page.screenshot(path=join(DIR_PATH, "screenshot.png"), full_page=True) |
| while input("input c to continue: ") != "c": |
| continue |
| await page.context.storage_state(path=save_cookie) |
|
|
|
|
| |
| def get_local_ip(): |
| net_card_info = [] |
| info = psutil.net_if_addrs() |
| for k, v in info.items(): |
| for item in v: |
| if item[0] == 2 and not item[1] == "127.0.0.1": |
| net_card_info.append(item[1]) |
| local_ip = net_card_info[0] |
| return local_ip |
|
|
|
|
| |
| def get_domain(url: str) -> str: |
| result = get_tld(url, as_object=True) |
| domain = result.domain |
| return domain |
|
|
|
|
| |
| def get_pi_ip(host: str) -> str: |
| domain_ip = socket.gethostbyname(host) |
| return domain_ip |
|
|
|
|
| |
| def push_msg( |
| title: str = "无标题", message: str = "无内容", img_url: str = gotify_img |
| ) -> dict: |
| gotify_ip = get_pi_ip(gotify_host) |
| url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}" |
| data = { |
| "title": title, |
| "message": message, |
| "priority": priority, |
| "extras": { |
| "client::display": {"contentType": "text/plaintext"}, |
| "client::notification": {"bigImageUrl": img_url}, |
| }, |
| } |
| resp = requests.post(url=url, json=data, timeout=20) |
| return {"code": resp.status_code, "res": resp.content} |
|
|
|
|
| async def get_cookie_from_pi() -> dict: |
| sql = f"SELECT cookie_name, cookie_value FROM cookie" |
| pool = await get_aiomysql_instance() |
| data = await pool.query(sql) |
| for d in data: |
| log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json") |
| async with aiofiles.open(log_file, mode="w") as handle: |
| await handle.write(d["cookie_value"]) |
| return data[0]["cookie_name"] |
|
|
|
|
| async def read_logfile(log_file: str) -> str: |
| async with aiofiles.open(log_file, mode="r", encoding="utf-8") as f: |
| data = await f.read() |
| return data |
|
|
|
|
| |
| async def get_streevoice_today_song(page): |
| await page.goto("https://streetvoice.com/") |
| song = await page.locator("#player-collapse h4").get_by_role("link").text_content() |
| logger.info(f"街声今日歌曲:{song}") |
| await page.go_back() |
| return song |
|
|
|
|
| async def main(): |
| |
| |
| |
| |
| |
| |
| res = await read_logfile("log/bilibili.log") |
| logger.debug(f"utils test: {res}") |
|
|
|
|
| if __name__ == "__main__": |
| logger.debug(f"DEBUG Mode: {DEBUG}") |
| logger.debug(f"DIR_PATH: {DIR_PATH}") |
| |
|
|