| |
| """ |
| PEKKA media + mail bridge utility. |
| |
| Standalone helper for: |
| - send email (SMTP) |
| - read email (Gmail API or IMAP fallback) |
| - upload video to YouTube (Google API) |
| |
| Usage examples: |
| python pekka_media_mail.py send-email --to x@example.com --subject "Hi" --body "Hello" |
| python pekka_media_mail.py read-email --query "newer_than:2d" --limit 5 |
| python pekka_media_mail.py upload-youtube --file ./video.mp4 --title "My Video" |
| """ |
|
|
| import argparse |
| import os |
| import smtplib |
| from email.mime.text import MIMEText |
| from pathlib import Path |
|
|
|
|
| GOOGLE_SCOPES = [ |
| "https://www.googleapis.com/auth/gmail.readonly", |
| "https://www.googleapis.com/auth/youtube.upload", |
| ] |
|
|
|
|
| def _google_service(service_name: str, version: str): |
| from google.oauth2.credentials import Credentials |
| from google_auth_oauthlib.flow import InstalledAppFlow |
| from google.auth.transport.requests import Request |
| from googleapiclient.discovery import build |
|
|
| token_path = Path(os.getenv("GOOGLE_TOKEN_PATH", "./token.json")) |
| cred_path = Path(os.getenv("GOOGLE_CLIENT_SECRET", "./credentials.json")) |
|
|
| creds = None |
| if token_path.exists(): |
| creds = Credentials.from_authorized_user_file(str(token_path), GOOGLE_SCOPES) |
|
|
| if not creds or not creds.valid: |
| if creds and creds.expired and creds.refresh_token: |
| creds.refresh(Request()) |
| else: |
| if not cred_path.exists(): |
| raise FileNotFoundError(f"Missing Google OAuth client file: {cred_path}") |
| flow = InstalledAppFlow.from_client_secrets_file(str(cred_path), GOOGLE_SCOPES) |
| creds = flow.run_local_server(port=0) |
| token_path.write_text(creds.to_json(), encoding="utf-8") |
|
|
| return build(service_name, version, credentials=creds) |
|
|
|
|
| def send_email(to: str, subject: str, body: str): |
| smtp_user = os.getenv("SMTP_USER", "") |
| smtp_pass = os.getenv("SMTP_PASS", "") |
| smtp_host = os.getenv("SMTP_HOST", "smtp.gmail.com") |
| smtp_port = int(os.getenv("SMTP_PORT", "587")) |
|
|
| if not smtp_user or not smtp_pass: |
| raise RuntimeError("SMTP_USER/SMTP_PASS are required") |
|
|
| msg = MIMEText(body) |
| msg["Subject"] = subject |
| msg["From"] = smtp_user |
| msg["To"] = to |
|
|
| with smtplib.SMTP(smtp_host, smtp_port) as server: |
| server.starttls() |
| server.login(smtp_user, smtp_pass) |
| server.send_message(msg) |
|
|
| print(f"EMAIL_SENT to={to}") |
|
|
|
|
| def read_email(limit: int = 5, query: str = ""): |
| |
| try: |
| gmail = _google_service("gmail", "v1") |
| response = gmail.users().messages().list(userId="me", q=query, maxResults=limit).execute() |
| msgs = response.get("messages", []) |
| if not msgs: |
| print("NO_EMAILS") |
| return |
| for m in msgs: |
| data = gmail.users().messages().get(userId="me", id=m["id"], format="metadata", metadataHeaders=["From", "Subject", "Date"]).execute() |
| headers = {h["name"]: h["value"] for h in data.get("payload", {}).get("headers", [])} |
| print(f"FROM={headers.get('From','')} | SUBJECT={headers.get('Subject','')} | DATE={headers.get('Date','')}") |
| return |
| except Exception as exc: |
| print(f"GMAIL_API_FAILED: {exc}") |
|
|
| |
| import imaplib |
| import email |
|
|
| imap_host = os.getenv("IMAP_HOST", "") |
| imap_port = int(os.getenv("IMAP_PORT", "993")) |
| imap_user = os.getenv("IMAP_USER", "") |
| imap_pass = os.getenv("IMAP_PASS", "") |
|
|
| if not imap_host or not imap_user or not imap_pass: |
| raise RuntimeError("IMAP fallback unavailable: set IMAP_HOST/IMAP_USER/IMAP_PASS") |
|
|
| mailbox = imaplib.IMAP4_SSL(imap_host, imap_port) |
| mailbox.login(imap_user, imap_pass) |
| mailbox.select("INBOX") |
| _, ids = mailbox.search(None, "ALL") |
| message_ids = ids[0].split()[-limit:] |
|
|
| for mid in reversed(message_ids): |
| _, msg_data = mailbox.fetch(mid, "(RFC822)") |
| msg = email.message_from_bytes(msg_data[0][1]) |
| print(f"FROM={msg.get('From','')} | SUBJECT={msg.get('Subject','')} | DATE={msg.get('Date','')}") |
|
|
| mailbox.logout() |
|
|
|
|
| def upload_youtube(file_path: str, title: str, description: str): |
| if not Path(file_path).exists(): |
| raise FileNotFoundError(file_path) |
|
|
| from googleapiclient.http import MediaFileUpload |
|
|
| youtube = _google_service("youtube", "v3") |
| privacy = os.getenv("YOUTUBE_DEFAULT_PRIVACY", "private") |
|
|
| body = { |
| "snippet": { |
| "title": title, |
| "description": description, |
| "categoryId": "22", |
| }, |
| "status": {"privacyStatus": privacy}, |
| } |
|
|
| request = youtube.videos().insert( |
| part="snippet,status", |
| body=body, |
| media_body=MediaFileUpload(file_path), |
| ) |
| response = request.execute() |
| vid = response.get("id", "") |
| print(f"YOUTUBE_URL=https://youtu.be/{vid}") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="PEKKA mail + YouTube bridge") |
| sub = parser.add_subparsers(dest="cmd", required=True) |
|
|
| send = sub.add_parser("send-email") |
| send.add_argument("--to", required=True) |
| send.add_argument("--subject", required=True) |
| send.add_argument("--body", required=True) |
|
|
| read = sub.add_parser("read-email") |
| read.add_argument("--limit", type=int, default=5) |
| read.add_argument("--query", default="") |
|
|
| yt = sub.add_parser("upload-youtube") |
| yt.add_argument("--file", required=True) |
| yt.add_argument("--title", required=True) |
| yt.add_argument("--description", default="Uploaded by PEKKA") |
|
|
| args = parser.parse_args() |
|
|
| if args.cmd == "send-email": |
| send_email(args.to, args.subject, args.body) |
| elif args.cmd == "read-email": |
| read_email(args.limit, args.query) |
| elif args.cmd == "upload-youtube": |
| upload_youtube(args.file, args.title, args.description) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|