File size: 3,340 Bytes
b8e6434 c5a34a3 b8e6434 c5a34a3 54ad6b4 b8e6434 c5a34a3 b8e6434 c5a34a3 b8e6434 0d71eae b8e6434 0d71eae b8e6434 0d71eae b8e6434 0d71eae b8e6434 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | from sqlalchemy.orm import Session
from datetime import datetime
from app.models.tender import TenderModel
from app.services.mercado_publico import fetch_tenders, get_tender_by_code
import json
async def sync_tenders_to_db(db: Session, keyword: str = None):
"""
Fetches real tenders from Mercado Público API and saves them.
"""
print(f"[Sync] Starting REAL synchronization... keyword={keyword}")
try:
api_tenders = await fetch_tenders(keyword=keyword)
if not api_tenders:
print("[Sync] No active tenders found for today in the API.")
return {"new": 0, "updated": 0, "message": "No new tenders found"}
print(f"[Sync] API returned {len(api_tenders)} real tenders for processing.")
except Exception as e:
print(f"[Sync] API error: {e}")
return {"new": 0, "updated": 0, "message": f"API Error: {str(e)}"}
count_new = 0
count_updated = 0
for api_t in api_tenders:
# Check if exists
db_tender = db.query(TenderModel).filter(TenderModel.code == api_t.code).first()
# Helper to parse dates
def parse_dt(dt_str):
if not dt_str: return None
try:
# Handle Z and other common formats
clean_str = dt_str.replace("Z", "").split(".")[0]
return datetime.fromisoformat(clean_str)
except:
return None
# Convert Pydantic model to dict for DB
tender_data = {
"code": api_t.code,
"name": api_t.name,
"buyer": api_t.buyer,
"buyer_region": api_t.buyer_region,
"status": api_t.status,
"status_code": str(api_t.status_code) if api_t.status_code else None,
"type": api_t.type,
"currency": api_t.currency,
"closing_date": parse_dt(api_t.closing_date) if isinstance(api_t.closing_date, str) else api_t.closing_date,
"publication_date": parse_dt(api_t.publication_date) if isinstance(api_t.publication_date, str) else api_t.publication_date,
"description": api_t.description,
"estimated_amount": api_t.estimated_amount,
"source": api_t.source,
"region": api_t.region,
"sector": api_t.sector,
"items": [item.model_dump() for item in api_t.items] if api_t.items else [],
"attachments": [att.model_dump() for att in api_t.attachments] if api_t.attachments else []
}
if db_tender:
# Update existing
for key, value in tender_data.items():
setattr(db_tender, key, value)
count_updated += 1
else:
# Create new
new_tender = TenderModel(**tender_data)
db.add(new_tender)
count_new += 1
db.commit()
print(f"[Sync] Finished. New: {count_new}, Updated: {count_updated}")
return {"new": count_new, "updated": count_updated}
def clean_expired_tenders(db: Session):
"""
Removes tenders where closing_date is in the past.
"""
now = datetime.now()
expired = db.query(TenderModel).filter(TenderModel.closing_date < now).delete()
db.commit()
print(f"[Sync] Cleaned {expired} expired tenders.")
return expired
|