| import json |
| from pathlib import Path |
|
|
| import pandas as pd |
| from fastapi import FastAPI |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
|
|
|
|
| app = FastAPI() |
|
|
| origins = [ |
| "https://pro.openbb.dev", |
| "https://pro.openbb.co", |
| "https://excel.openbb.co", |
| "https://excel.openbb.dev", |
| ] |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=origins, |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| ROOT_PATH = Path(__file__).parent.resolve() |
|
|
| @app.get("/") |
| def read_root(): |
| return {"Info": "Full example for OpenBB Custom Backend"} |
|
|
|
|
| @app.get("/csv-data") |
| def csv_data(): |
| """Read mock csv data and return it as a table to your widget""" |
| |
| csv_file_path = "mock_data.csv" |
|
|
| try: |
| |
| return pd.read_csv((ROOT_PATH / csv_file_path).open()).to_dict(orient="records") |
| except Exception as e: |
| |
| error_message = f"Error reading the CSV file: {str(e)}" |
| return JSONResponse(content={"error": error_message}, status_code=500) |
|
|
|
|
| from pydantic import BaseModel |
| import random |
| import time |
|
|
|
|
| |
| class StockData(BaseModel): |
| ticker: str |
| price: float |
| volume: int |
| timestamp: float |
|
|
| |
| def generate_mock_stock_data(ticker: str): |
| return { |
| "ticker": ticker, |
| "price": round(random.uniform(100, 500), 2), |
| "volume": random.randint(1000, 10000), |
| "timestamp": time.time() |
| } |
|
|
| |
| @app.get("/api/stocks", response_model=StockData) |
| async def get_stock_data(ticker: str): |
| """ |
| Fetches stock data for a given ticker symbol. |
| The data includes price, volume, and timestamp. |
| """ |
| |
| stock_data = generate_mock_stock_data(ticker) |
| return stock_data |
| |