| import os |
| import json |
| import hashlib |
| import time |
| import re |
| import argparse |
| from pathlib import Path |
|
|
| import markdown |
| from markdown.extensions import tables, fenced_code, codehilite, toc |
| from googleapiclient.discovery import build |
| from googleapiclient.errors import HttpError |
| from google.auth.transport.requests import Request |
| from google.oauth2.credentials import Credentials |
|
|
| BLOG_ID = os.environ["BLOG_ID"] |
| TOKEN_FILE = os.environ["TOKEN_FILE"] |
| PUBLISHED_FILE = "published_posts.json" |
| GH_PAGES_BASE = "https://kagvi13.github.io/HMP/" |
|
|
|
|
| def convert_md_links(md_text: str) -> str: |
| """Конвертирует относительные ссылки (*.md) в абсолютные ссылки на GitHub Pages.""" |
| def replacer(match): |
| text = match.group(1) |
| link = match.group(2) |
| if link.startswith("http://") or link.startswith("https://") or not link.endswith(".md"): |
| return match.group(0) |
| abs_link = GH_PAGES_BASE + link.replace(".md", "").lstrip("./") |
| return f"[{text}]({abs_link})" |
| return re.sub(r"\[([^\]]+)\]\(([^)]+)\)", replacer, md_text) |
|
|
|
|
| def load_published(): |
| if Path(PUBLISHED_FILE).exists(): |
| with open(PUBLISHED_FILE, "r", encoding="utf-8") as f: |
| return json.load(f) |
| print("⚠ published_posts.json не найден — начинаем с нуля.") |
| return {} |
|
|
|
|
| def save_published(data): |
| with open(PUBLISHED_FILE, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
|
|
| def file_hash(path): |
| return hashlib.md5(Path(path).read_bytes()).hexdigest() |
|
|
|
|
| def get_existing_posts(service): |
| """Возвращает словарь title → post_id для постов, которые можно редактировать.""" |
| existing = {} |
| nextPageToken = None |
| while True: |
| try: |
| response = service.posts().list(blogId=BLOG_ID, maxResults=500, pageToken=nextPageToken).execute() |
| for post in response.get("items", []): |
| post_id = post["id"] |
| title = post["title"] |
| existing[title] = post_id |
| nextPageToken = response.get("nextPageToken") |
| if not nextPageToken: |
| break |
| except HttpError as e: |
| print(f"❌ Ошибка при получении списка постов: {e}") |
| break |
| return existing |
|
|
|
|
| def main(force: bool = False): |
| creds = None |
| if os.path.exists(TOKEN_FILE): |
| creds = Credentials.from_authorized_user_file(TOKEN_FILE, ["https://www.googleapis.com/auth/blogger"]) |
|
|
| if creds and creds.expired and creds.refresh_token: |
| creds.refresh(Request()) |
| with open(TOKEN_FILE, "w") as token_file: |
| token_file.write(creds.to_json()) |
|
|
| service = build("blogger", "v3", credentials=creds) |
| published = load_published() |
| existing_posts = get_existing_posts(service) |
|
|
| md_files = list(Path("docs").rglob("*.md")) |
| for md_file in md_files: |
| name = md_file.stem |
| h = file_hash(md_file) |
|
|
| md_text = md_file.read_text(encoding="utf-8") |
| source_link = f"Источник: [ {md_file.name} ](https://github.com/kagvi13/HMP/blob/main/docs/{md_file.name})\n\n" |
| md_text = source_link + md_text |
| md_text = convert_md_links(md_text) |
|
|
| html_content = markdown.markdown( |
| md_text, |
| extensions=["tables", "fenced_code", "codehilite", "toc"] |
| ) |
|
|
| style = """ |
| <style> |
| table { display: block; max-width: 100%; overflow-x: auto; border-collapse: collapse; } |
| th, td { border: 1px solid #ccc; padding: 6px 12px; } |
| pre { display: block; max-width: 100%; overflow-x: auto; padding: 10px; background-color: #f8f8f8; border: 1px solid #ddd; border-radius: 6px; font-family: monospace; white-space: pre; } |
| </style> |
| """ |
| html_content = style + html_content |
|
|
| body = { |
| "kind": "blogger#post", |
| "title": name, |
| "content": html_content, |
| } |
|
|
| try: |
| post_id = existing_posts.get(name) |
|
|
| if post_id: |
| |
| if not force and name in published and published[name]["hash"] == h: |
| print(f"✅ Пост '{name}' без изменений — пропускаем.") |
| continue |
|
|
| try: |
| post = service.posts().update(blogId=BLOG_ID, postId=post_id, body=body).execute() |
| print(f"♻ Обновлён пост: {post['url']}") |
| except HttpError as e: |
| if e.resp.status == 403: |
| post = service.posts().insert(blogId=BLOG_ID, body=body).execute() |
| print(f"⚠ Пост существовал, но прав нет. Создан новый: {post['url']}") |
| post_id = post["id"] |
| else: |
| raise e |
| else: |
| post = service.posts().insert(blogId=BLOG_ID, body=body).execute() |
| print(f"🆕 Пост опубликован: {post['url']}") |
| post_id = post["id"] |
|
|
| published[name] = {"id": post_id, "hash": h} |
| save_published(published) |
|
|
| print("⏱ Пауза 60 секунд перед следующим постом...") |
| time.sleep(60) |
|
|
| except HttpError as e: |
| print(f"❌ Ошибка при публикации {name}: {e}") |
| save_published(published) |
| break |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--force", action="store_true", help="Обновить все посты, даже без изменений") |
| args = parser.parse_args() |
|
|
| main(force=args.force) |
|
|