Álvaro Valenzuela Valdes
feat: enhance AI agents with detailed Mercado Publico data and update frontend UI
0d71eae
raw
history blame
3.34 kB
from sqlalchemy.orm import Session
from datetime import datetime
from app.models.tender import TenderModel
from app.services.mercado_publico import fetch_tenders, get_tender_by_code
import json
async def sync_tenders_to_db(db: Session, keyword: str = None):
"""
Fetches real tenders from Mercado Público API and saves them.
"""
print(f"[Sync] Starting REAL synchronization... keyword={keyword}")
try:
api_tenders = await fetch_tenders(keyword=keyword)
if not api_tenders:
print("[Sync] No active tenders found for today in the API.")
return {"new": 0, "updated": 0, "message": "No new tenders found"}
print(f"[Sync] API returned {len(api_tenders)} real tenders for processing.")
except Exception as e:
print(f"[Sync] API error: {e}")
return {"new": 0, "updated": 0, "message": f"API Error: {str(e)}"}
count_new = 0
count_updated = 0
for api_t in api_tenders:
# Check if exists
db_tender = db.query(TenderModel).filter(TenderModel.code == api_t.code).first()
# Helper to parse dates
def parse_dt(dt_str):
if not dt_str: return None
try:
# Handle Z and other common formats
clean_str = dt_str.replace("Z", "").split(".")[0]
return datetime.fromisoformat(clean_str)
except:
return None
# Convert Pydantic model to dict for DB
tender_data = {
"code": api_t.code,
"name": api_t.name,
"buyer": api_t.buyer,
"buyer_region": api_t.buyer_region,
"status": api_t.status,
"status_code": str(api_t.status_code) if api_t.status_code else None,
"type": api_t.type,
"currency": api_t.currency,
"closing_date": parse_dt(api_t.closing_date) if isinstance(api_t.closing_date, str) else api_t.closing_date,
"publication_date": parse_dt(api_t.publication_date) if isinstance(api_t.publication_date, str) else api_t.publication_date,
"description": api_t.description,
"estimated_amount": api_t.estimated_amount,
"source": api_t.source,
"region": api_t.region,
"sector": api_t.sector,
"items": [item.model_dump() for item in api_t.items] if api_t.items else [],
"attachments": [att.model_dump() for att in api_t.attachments] if api_t.attachments else []
}
if db_tender:
# Update existing
for key, value in tender_data.items():
setattr(db_tender, key, value)
count_updated += 1
else:
# Create new
new_tender = TenderModel(**tender_data)
db.add(new_tender)
count_new += 1
db.commit()
print(f"[Sync] Finished. New: {count_new}, Updated: {count_updated}")
return {"new": count_new, "updated": count_updated}
def clean_expired_tenders(db: Session):
"""
Removes tenders where closing_date is in the past.
"""
now = datetime.now()
expired = db.query(TenderModel).filter(TenderModel.closing_date < now).delete()
db.commit()
print(f"[Sync] Cleaned {expired} expired tenders.")
return expired