Gnasmim / app.py
Modir19's picture
Create app.py
aa9da69 verified
import os, asyncio, requests
from groq import Groq
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, MessageHandler, filters, ContextTypes
# --- SOVEREIGN CONFIGURATION ---
GROQ_KEY = "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"
TELEGRAM_TOKEN = "8503693788:AAE8XujCgN3tD-R88P1f_KMHZEUuUgarkZs"
HELIUS_KEY = "6c65e97f-0718-4837-88d6-80558b54ef0c"
USER_CHAT_ID = None
ai_client = Groq(api_key=GROQ_KEY)
# Initial Targets (Top Profit Wallets)
TARGETS = [
"77A8X8H5VnL8P8vR9JmE2G5uK9sWp9G",
"DfMxL6kQ2nZ8vR9JmE2G5uK9sWp9G1a"
]
LAST_TX = {target: None for target in TARGETS}
async def get_ai_insight(description):
"""AI Brain: Real-time trade strategy analysis"""
try:
chat = ai_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[
{"role": "system", "content": "You are an elite crypto analyst. Explain the strategy of this Solana trade in one short sentence."},
{"role": "user", "content": description}
]
)
return chat.choices[0].message.content
except: return "AI Analysis Offline."
async def surveillance_loop(context: ContextTypes.DEFAULT_TYPE):
"""24/7 Whale Tracking Engine"""
global USER_CHAT_ID
if not USER_CHAT_ID: return
for wallet in TARGETS:
url = f"https://api.helius.xyz/v0/addresses/{wallet}/transactions?api-key={HELIUS_KEY}"
try:
res = requests.get(url).json()
if not res or res[0].get('signature') == LAST_TX[wallet]: continue
LAST_TX[wallet] = res[0].get('signature')
desc = res[0].get('description', 'New Chain Activity')
ai_intel = await get_ai_insight(desc)
alert = (
f"🚨 **SHADOW ALERT: WHALE DETECTED**\n"
f"━━━━━━━━━━━━━━━━━━\n"
f"🎯 **TARGET:** `{wallet[:10]}...`\n"
f"πŸ“Š **ACTIVITY:** {desc}\n"
f"🧠 **AI INTEL:** _{ai_intel}_\n"
f"━━━━━━━━━━━━━━━━━━\n"
f"⚑️ [OPEN ON SOLSCAN](https://solscan.io/tx/{LAST_TX[wallet]})"
)
await context.bot.send_message(chat_id=USER_CHAT_ID, text=alert, parse_mode='Markdown')
except: continue
async def activate_system(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Initial handshake to capture Chat ID"""
global USER_CHAT_ID
USER_CHAT_ID = update.effective_chat.id
await update.message.reply_text(
"πŸ›‘ **SHADOW PROTOCOL ACTIVATED**\n"
"Link established with Chat ID. Monitoring the void..."
)
async def run_sovereign():
app = Application.builder().token(TELEGRAM_TOKEN).build()
# Auto-check every 30 seconds
if app.job_queue:
app.job_queue.run_repeating(surveillance_loop, interval=30, first=5)
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, activate_system))
async with app:
await app.initialize()
await app.start_polling()
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(run_sovereign())