import json import subprocess from concurrent.futures import ThreadPoolExecutor from realtime.connection import Socket import os import time from supabase import create_client from dotenv import load_dotenv import requests from fastapi import FastAPI import uvicorn import threading import asyncio from slack_sdk import WebClient import re app = FastAPI() load_dotenv() SUPABASE_ID = os.getenv('supabaseID') SUPABASE_URL = os.getenv('supabaseUrl') SUPABASE_KEY = os.getenv('supabaseAnonKey') slackclient = WebClient(token=os.getenv('slack_bot_oauth')) # Supabase client creation for each function def get_client(): return create_client(SUPABASE_URL, SUPABASE_KEY) def getUsername(userid): supabase = get_client() response = supabase.table("profiles").select( "*").eq("id", userid).execute() data = response.data username = data[0]['username'] return username def getAuthorNotifToken(userid): supabase = get_client() response = supabase.table("profiles").select( "*").eq("id", userid).execute() data = response.data token = data[0]['notifToken'] return token # def send_notification(title, body, token, id, post_id, notif_type): # print(token) # notification_data = { # 'to': token, # 'title': title, # 'body': body, # 'data': { # 'user_id': id, # 'id': post_id, # 'type': notif_type # } # } # notification_json = json.dumps(notification_data) # print(notification_json) # curl_command = [ # 'curl', # '-X', 'POST', # '-H', 'Content-Type: application/json', # '-d', notification_json, # 'https://exp.host/--/api/v2/push/send' # ] # try: # result = subprocess.run( # curl_command, capture_output=True, text=True, check=True) # print(f'Notification sent successfully: {result.stdout}') # except subprocess.CalledProcessError as e: # print(f'Failed to send notification: {e.stderr}') def send_notification(title, body, token, id, post_id, notif_type): print(token) notification_data = { 'to': token, 'title': title, 'body': body, 'data': { 'user_id': id, 'id': post_id, 'type': notif_type } } try: response = requests.post( 'https://exp.host/--/api/v2/push/send', json=notification_data) response.raise_for_status() print(f'Notification sent successfully: {response.text}') except requests.exceptions.RequestException as e: print(f'Failed to send notification: {e}') def handle_list_comment_tags_update(event_payload): print('Received update in comment tags:') print(event_payload) record = event_payload['record'] try: content = record.get('comment', '[]') tagged_user_ids = re.findall(r'@<([^>]+)>', content) except json.JSONDecodeError: print('Failed to parse tags.') return author_id = record['user_id'] username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} tagged you in a comment!' post_id = record['post_id'] for tagged_user_id in tagged_user_ids: if tagged_user_id != author_id: token = getAuthorNotifToken(tagged_user_id) if token: send_notification(title, body, token, author_id, post_id, "list") def handle_list_reply_tags_update(event_payload): print('Received update in reply tags:') print(event_payload) record = event_payload['record'] try: content = record.get('comment', '[]') tagged_user_ids = re.findall(r'@<([^>]+)>', content) except json.JSONDecodeError: print('Failed to parse tags.') return author_id = record['author_id'] username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} tagged you in a reply!' post_id = record['post_id'] for tagged_user_id in tagged_user_ids: if tagged_user_id != author_id: token = getAuthorNotifToken(tagged_user_id) if token: send_notification(title, body, token, author_id, post_id, "list") def handle_comment_tags_update(event_payload): print('Received update in comment tags:') print(event_payload) record = event_payload['record'] try: content = record.get('comment', '[]') tagged_user_ids = re.findall(r'@<([^>]+)>', content) except json.JSONDecodeError: print('Failed to parse tags.') return author_id = record['user_id'] username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} tagged you in a comment!' post_id = record['post_id'] for tagged_user_id in tagged_user_ids: if tagged_user_id != author_id: token = getAuthorNotifToken(tagged_user_id) if token: send_notification(title, body, token, author_id, post_id, "post") def handle_reply_tags_update(event_payload): print('Received update in reply tags:') print(event_payload) record = event_payload['record'] try: content = record.get('comment', '[]') tagged_user_ids = re.findall(r'@<([^>]+)>', content) except json.JSONDecodeError: print('Failed to parse tags.') return author_id = record['author_id'] username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} tagged you in a reply!' post_id = record['post_id'] for tagged_user_id in tagged_user_ids: if tagged_user_id != author_id: token = getAuthorNotifToken(tagged_user_id) if token: send_notification(title, body, token, author_id, post_id, "post") def send_slack_message(event_type: str, category: str, username: str, post_id: str = None, list_id: str = None): try: print("reach 1") if event_type == "report": target = f"post {post_id}" if post_id is not None else f"collection {list_id}" message = f":rotating_light: A report has been submitted by *{username}* for {target}." else: message = f":information_source: A support ticket has been submitted by *{username}*." print("reach 2") response = slackclient.chat_postMessage( channel="C092DLFAFSR", text=message, username="Support Bot", icon_emoji=":robot_face:" ) print("reach 3") if not response["ok"]: print(f"Slack API error: {response}") return response except Exception as e: print("Failed to send Slack message") return None def handle_support(event_payload: dict): print("Received update in Support or reportedContent:") print(event_payload) record = event_payload.get("record", {}) username = getUsername(record.get("user_id")).lower() post_id = record.get("post_id") list_id = record.get("list_id") category = record.get("category", "uncategorized") event_type = "report" if post_id is not None or list_id is not None else "support" send_slack_message(event_type, category, username, post_id, list_id) def handle_tags_update(event_payload): print('Received update in post tags:') print(event_payload) record = event_payload['record'] try: content = record.get('content', '[]') tagged_user_ids = re.findall(r'@<([^>]+)>', content) except json.JSONDecodeError: print('Failed to parse tags.') return author_id = record['author_id'] username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} tagged you in a post!' post_id = record['id'] for tagged_user_id in tagged_user_ids: if tagged_user_id != author_id: token = getAuthorNotifToken(tagged_user_id) if token: send_notification(title, body, token, author_id, post_id, "post") def handle_listlikes_update(event_payload): print('Received update in listlikes:') print(event_payload) record = event_payload['record'] id = record['list_id'] author_id = record['author_id'] user_id = record['user_id'] token = getAuthorNotifToken(author_id) username = getUsername(user_id).lower() title = 'Picturelock' body = f'@{username} liked your collection!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, "list") def handle_listcomments_update(event_payload): print('Received update in listcomments:') print(event_payload) record = event_payload['record'] author_id = record['author_id'] user_id = record['user_id'] id = record['list_id'] token = getAuthorNotifToken(author_id) username = getUsername(user_id).lower() title = 'Picturelock' body = f'@{username} commented on your collection!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, "list") def handle_likes_update(event_payload): print('Received update in likes:') print(event_payload) record = event_payload['record'] author_id = record['author_id'] user_id = record['user_id'] id = record['post_id'] token = getAuthorNotifToken(author_id) username = getUsername(user_id).lower() title = 'Picturelock' body = f'@{username} liked your post!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, "post") def handle_comments_update(event_payload): print('Received update in comments:') print(event_payload) record = event_payload['record'] author_id = record['author_id'] user_id = record['user_id'] id = record['post_id'] token = getAuthorNotifToken(author_id) username = getUsername(user_id).lower() title = 'Picturelock' body = f'@{username} commented on your post!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, "post") def handle_follow_update(event_payload): print('Received update in followers:') print(event_payload) record = event_payload['record'] author_id = record['id'] user_id = record['following'] id = record['following'] token = getAuthorNotifToken(author_id) username = getUsername(user_id).lower() title = 'Picturelock' body = f'@{username} followed you!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, "follow") def getConversationUser(author_id, conv_id): supabase = create_client(SUPABASE_URL, SUPABASE_KEY) response = supabase.table("conversations").select("*").eq("id", conv_id).execute() data = response.data user1 = data[0]['user1'] user2 = data[0]['user2'] if user1 == author_id: return user2 else: return user1 def handle_message_update(event_payload): print('Received update in messages:') print(event_payload) record = event_payload['record'] author_id = record['user_id'] conv_id = record['conversation_id'] id = record['conversation_id'] user_id = getConversationUser(author_id, conv_id) token = getAuthorNotifToken(user_id) username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} sent you a message!' if token and user_id != author_id: send_notification(title, body, token, author_id, id, "message") def handle_listreply_update(event_payload): print('Received update in listreplies:') print(event_payload) record = event_payload['record'] author_id = record['user_id'] user_id = record['to'] id = record['list_id'] token = getAuthorNotifToken(user_id) username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} replied to you!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, "list") def handle_reply_update(event_payload): print('Received update in replies:') print(event_payload) record = event_payload['record'] author_id = record['user_id'] user_id = record['to'] id = record['post_id'] token = getAuthorNotifToken(user_id) username = getUsername(author_id).lower() body = f'@{username} replied to you!' title = 'Picturelock' if token and user_id != author_id: send_notification(title, body, token, user_id, id, "post") def handle_commentlikes_update(event_payload): print('Received update in commentlikes:') print(event_payload) record = event_payload['record'] author_id = record['author_id'] user_id = record['user_id'] if record['post_id'] == None: id = record['list_id'] post_type = "list" else: id = record['post_id'] post_type = "post" token = getAuthorNotifToken(author_id) username = getUsername(user_id).lower() title = 'Picturelock' body = f'@{username} liked your comment!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, post_type) def handle_replylikes_update(event_payload): print('Received update in replylikes:') print(event_payload) record = event_payload['record'] author_id = record['author_id'] user_id = record['user_id'] if record['post_id'] == None: id = record['list_id'] post_type = "list" else: id = record['post_id'] post_type = "post" token = getAuthorNotifToken(author_id) username = getUsername(user_id).lower() title = 'Picturelock' body = f'@{username} liked your reply!' if token and user_id != author_id: send_notification(title, body, token, user_id, id, post_type) def handle_groupreviewrequests_update(event_payload): print('Received update in posts:') print(event_payload) record = event_payload['record'] author_id = record['groupReviewLeader'] id = record['id'] user_id = record['author'] isGroupReview = record['isGroupReview'] token = getAuthorNotifToken(user_id) username = getUsername(author_id).lower() title = 'Picturelock' body = f'@{username} sent you an invite to a group review!' print(body) if token and user_id != author_id and isGroupReview: send_notification(title, body, token, author_id, id, "post") def setup_likes_subscription(SUPABASE_ID, SUPABASE_KEY): asyncio.set_event_loop(asyncio.new_event_loop()) URL = f"wss://{SUPABASE_ID}.supabase.co/realtime/v1/websocket?apikey={SUPABASE_KEY}&vsn=1.0.0" with ThreadPoolExecutor(max_workers=10) as executor: while True: try: s = Socket(URL) s.connect() channel_listlikes = s.set_channel("realtime:public:listlikes") channel_listlikes.join().on( "INSERT", lambda payload: executor.submit(handle_listlikes_update, payload)) channel_listcomments = s.set_channel("realtime:public:listcomments") channel_listcomments.join().on( "INSERT", lambda payload: executor.submit(handle_listcomments_update, payload)) channel_likes = s.set_channel("realtime:public:likes") channel_likes.join().on( "INSERT", lambda payload: executor.submit(handle_likes_update, payload)) channel_replylikes = s.set_channel("realtime:public:replylikes") channel_replylikes.join().on( "INSERT", lambda payload: executor.submit(handle_replylikes_update, payload)) channel_commentlikes = s.set_channel("realtime:public:commentlikes") channel_commentlikes.join().on( "INSERT", lambda payload: executor.submit(handle_commentlikes_update, payload)) channel_comments = s.set_channel("realtime:public:comments") channel_comments.join().on( "INSERT", lambda payload: executor.submit(handle_comments_update, payload)) channel_followers = s.set_channel("realtime:public:followers") channel_followers.join().on( "INSERT", lambda payload: executor.submit(handle_follow_update, payload)) channel_replies = s.set_channel("realtime:public:replies") channel_replies.join().on( "INSERT", lambda payload: executor.submit(handle_reply_update, payload)) channel_listreplies = s.set_channel("realtime:public:listreplies") channel_listreplies.join().on( "INSERT", lambda payload: executor.submit(handle_listreply_update, payload)) channel_messages = s.set_channel("realtime:public:messages") channel_messages.join().on( "INSERT", lambda payload: executor.submit(handle_message_update, payload)) channel_posts = s.set_channel("realtime:public:posts") channel_posts.join().on( "INSERT", lambda payload: executor.submit(handle_groupreviewrequests_update, payload)) channel_posts_for_tags = s.set_channel("realtime:public:posts") channel_posts_for_tags.join().on( "INSERT", lambda payload: executor.submit(handle_tags_update, payload)) channel_posts_for_tags = s.set_channel("realtime:public:comments") channel_posts_for_tags.join().on( "INSERT", lambda payload: executor.submit(handle_comment_tags_update, payload)) channel_posts_for_tags = s.set_channel("realtime:public:replies") channel_posts_for_tags.join().on( "INSERT", lambda payload: executor.submit(handle_reply_tags_update, payload)) channel_posts_for_tags = s.set_channel("realtime:public:listcomments") channel_posts_for_tags.join().on( "INSERT", lambda payload: executor.submit(handle_list_comment_tags_update, payload)) channel_posts_for_tags = s.set_channel("realtime:public:listreplies") channel_posts_for_tags.join().on( "INSERT", lambda payload: executor.submit(handle_list_reply_tags_update, payload)) channel_support = s.set_channel("realtime:public:Support") channel_support.join().on( "INSERT", lambda payload: executor.submit(handle_support, payload)) channel_report = s.set_channel("realtime:public:reportedContent") channel_report.join().on( "INSERT", lambda payload: executor.submit(handle_support, payload)) s.listen() except Exception as e: print(f'Connection error: {e}') print('Reconnecting in 5 seconds...') time.sleep(5) @app.get("/") def read_root(): return {"status": "running"} if __name__ == "__main__": threading.Thread( target=setup_likes_subscription, args=(SUPABASE_ID, SUPABASE_KEY), daemon=True ).start() uvicorn.run(app, host="0.0.0.0", port=7860)