vps / app.py
Mrlongpro's picture
Update app.py
6de6d01 verified
import requests
import time
import pandas as pd
import gradio as gr
from fastapi import FastAPI
import threading
app = FastAPI()
URL = "https://histdatafeed.vps.com.vn/tradingview/history"
# ===== CACHE GLOBAL =====
latest_data = None
latest_signal = None
last_update = 0
# ===== FETCH DATA =====
def fetch_data():
global latest_data, last_update
try:
params = {
"symbol": "VN30F1M",
"resolution": "1",
"from": int(time.time()) - 3600,
"to": int(time.time()),
"countback": 200
}
res = requests.get(URL, params=params, timeout=5)
data = res.json()
df = pd.DataFrame({
"time": data["t"],
"open": data["o"],
"high": data["h"],
"low": data["l"],
"close": data["c"],
"volume": data["v"]
})
latest_data = df
last_update = time.time()
except Exception as e:
print("Fetch error:", e)
# ===== AI SIGNAL =====
def compute_signal():
global latest_data, latest_signal
if latest_data is None or len(latest_data) < 20:
return "WAIT"
df = latest_data.copy()
df["ma20"] = df["close"].rolling(20).mean()
if df["close"].iloc[-1] > df["ma20"].iloc[-1]:
latest_signal = "BUY"
else:
latest_signal = "SELL"
# ===== HEARTBEAT LOOP =====
def heartbeat():
while True:
fetch_data()
compute_signal()
print("Heartbeat update:", latest_signal)
time.sleep(10) # 10s update
threading.Thread(target=heartbeat, daemon=True).start()
# ===== API =====
@app.get("/signal")
def signal():
return {
"signal": latest_signal,
"price": float(latest_data["close"].iloc[-1]) if latest_data is not None else 0,
"time": int(last_update)
}
# ===== UI =====
def ui():
return f"Signal: {latest_signal} | Updated: {int(last_update)}"
interface = gr.Interface(
fn=ui,
inputs=[],
outputs="text",
live=True
).queue()
app = gr.mount_gradio_app(app, interface, path="/")