Agent / pekka_media_mail.py
Bjo53's picture
Upload 6 files
7eba363 verified
#!/usr/bin/env python3
"""
PEKKA media + mail bridge utility.
Standalone helper for:
- send email (SMTP)
- read email (Gmail API or IMAP fallback)
- upload video to YouTube (Google API)
Usage examples:
python pekka_media_mail.py send-email --to x@example.com --subject "Hi" --body "Hello"
python pekka_media_mail.py read-email --query "newer_than:2d" --limit 5
python pekka_media_mail.py upload-youtube --file ./video.mp4 --title "My Video"
"""
import argparse
import os
import smtplib
from email.mime.text import MIMEText
from pathlib import Path
GOOGLE_SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/youtube.upload",
]
def _google_service(service_name: str, version: str):
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
token_path = Path(os.getenv("GOOGLE_TOKEN_PATH", "./token.json"))
cred_path = Path(os.getenv("GOOGLE_CLIENT_SECRET", "./credentials.json"))
creds = None
if token_path.exists():
creds = Credentials.from_authorized_user_file(str(token_path), GOOGLE_SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not cred_path.exists():
raise FileNotFoundError(f"Missing Google OAuth client file: {cred_path}")
flow = InstalledAppFlow.from_client_secrets_file(str(cred_path), GOOGLE_SCOPES)
creds = flow.run_local_server(port=0)
token_path.write_text(creds.to_json(), encoding="utf-8")
return build(service_name, version, credentials=creds)
def send_email(to: str, subject: str, body: str):
smtp_user = os.getenv("SMTP_USER", "")
smtp_pass = os.getenv("SMTP_PASS", "")
smtp_host = os.getenv("SMTP_HOST", "smtp.gmail.com")
smtp_port = int(os.getenv("SMTP_PORT", "587"))
if not smtp_user or not smtp_pass:
raise RuntimeError("SMTP_USER/SMTP_PASS are required")
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = smtp_user
msg["To"] = to
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
print(f"EMAIL_SENT to={to}")
def read_email(limit: int = 5, query: str = ""):
# Preferred: Gmail API (works with same Google creds as YouTube upload)
try:
gmail = _google_service("gmail", "v1")
response = gmail.users().messages().list(userId="me", q=query, maxResults=limit).execute()
msgs = response.get("messages", [])
if not msgs:
print("NO_EMAILS")
return
for m in msgs:
data = gmail.users().messages().get(userId="me", id=m["id"], format="metadata", metadataHeaders=["From", "Subject", "Date"]).execute()
headers = {h["name"]: h["value"] for h in data.get("payload", {}).get("headers", [])}
print(f"FROM={headers.get('From','')} | SUBJECT={headers.get('Subject','')} | DATE={headers.get('Date','')}")
return
except Exception as exc:
print(f"GMAIL_API_FAILED: {exc}")
# Fallback: IMAP
import imaplib
import email
imap_host = os.getenv("IMAP_HOST", "")
imap_port = int(os.getenv("IMAP_PORT", "993"))
imap_user = os.getenv("IMAP_USER", "")
imap_pass = os.getenv("IMAP_PASS", "")
if not imap_host or not imap_user or not imap_pass:
raise RuntimeError("IMAP fallback unavailable: set IMAP_HOST/IMAP_USER/IMAP_PASS")
mailbox = imaplib.IMAP4_SSL(imap_host, imap_port)
mailbox.login(imap_user, imap_pass)
mailbox.select("INBOX")
_, ids = mailbox.search(None, "ALL")
message_ids = ids[0].split()[-limit:]
for mid in reversed(message_ids):
_, msg_data = mailbox.fetch(mid, "(RFC822)")
msg = email.message_from_bytes(msg_data[0][1])
print(f"FROM={msg.get('From','')} | SUBJECT={msg.get('Subject','')} | DATE={msg.get('Date','')}")
mailbox.logout()
def upload_youtube(file_path: str, title: str, description: str):
if not Path(file_path).exists():
raise FileNotFoundError(file_path)
from googleapiclient.http import MediaFileUpload
youtube = _google_service("youtube", "v3")
privacy = os.getenv("YOUTUBE_DEFAULT_PRIVACY", "private")
body = {
"snippet": {
"title": title,
"description": description,
"categoryId": "22",
},
"status": {"privacyStatus": privacy},
}
request = youtube.videos().insert(
part="snippet,status",
body=body,
media_body=MediaFileUpload(file_path),
)
response = request.execute()
vid = response.get("id", "")
print(f"YOUTUBE_URL=https://youtu.be/{vid}")
def main():
parser = argparse.ArgumentParser(description="PEKKA mail + YouTube bridge")
sub = parser.add_subparsers(dest="cmd", required=True)
send = sub.add_parser("send-email")
send.add_argument("--to", required=True)
send.add_argument("--subject", required=True)
send.add_argument("--body", required=True)
read = sub.add_parser("read-email")
read.add_argument("--limit", type=int, default=5)
read.add_argument("--query", default="")
yt = sub.add_parser("upload-youtube")
yt.add_argument("--file", required=True)
yt.add_argument("--title", required=True)
yt.add_argument("--description", default="Uploaded by PEKKA")
args = parser.parse_args()
if args.cmd == "send-email":
send_email(args.to, args.subject, args.body)
elif args.cmd == "read-email":
read_email(args.limit, args.query)
elif args.cmd == "upload-youtube":
upload_youtube(args.file, args.title, args.description)
if __name__ == "__main__":
main()