paijo77 commited on
Commit
aeba6f7
·
verified ·
1 Parent(s): ae50c7b

update app/db_storage_extended.py

Browse files
Files changed (1) hide show
  1. app/db_storage_extended.py +652 -0
app/db_storage_extended.py ADDED
@@ -0,0 +1,652 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Extended database storage methods for enhanced scraping functionality.
3
+ Builds upon the existing db_storage.py with additional session management,
4
+ Hunter Protocol integration, and performance monitoring.
5
+ """
6
+
7
+ from sqlalchemy.ext.asyncio import AsyncSession
8
+ from sqlalchemy import select, func, and_, or_, desc, asc, update
9
+ from sqlalchemy.orm import selectinload
10
+ from typing import List, Optional, Dict, Tuple
11
+ from datetime import datetime, timedelta
12
+ import logging
13
+ import json
14
+
15
+ from app.db_models import User, ProxySource, Proxy, CandidateSource
16
+ from app.db_storage import DatabaseStorage
17
+
18
+ logger = logging.getLogger(__name__)
19
+
20
+
21
+ class ExtendedDatabaseStorage(DatabaseStorage):
22
+ """Extended storage with enhanced scraping session management and monitoring."""
23
+
24
+ async def create_scraping_session(
25
+ self,
26
+ session: AsyncSession,
27
+ source_id: int,
28
+ scraping_type: str = "scheduled",
29
+ config: Optional[Dict] = None,
30
+ initiated_by: Optional[int] = None,
31
+ ) -> Dict:
32
+ """Create a new scraping session record."""
33
+
34
+ from app.db_models import ScrapingSession
35
+
36
+ scraping_session = ScrapingSession(
37
+ source_id=source_id,
38
+ scraping_type=scraping_type,
39
+ status="running",
40
+ config=config or {},
41
+ initiated_by=initiated_by,
42
+ started_at=datetime.utcnow(),
43
+ )
44
+
45
+ session.add(scraping_session)
46
+ await session.commit()
47
+ await session.refresh(scraping_session)
48
+
49
+ return {
50
+ "session_id": scraping_session.id,
51
+ "status": scraping_session.status,
52
+ "started_at": scraping_session.started_at,
53
+ }
54
+
55
+ async def update_scraping_session(
56
+ self,
57
+ session: AsyncSession,
58
+ session_id: int,
59
+ status: str,
60
+ proxies_found: int = 0,
61
+ proxies_valid: int = 0,
62
+ error_message: Optional[str] = None,
63
+ metadata: Optional[Dict] = None,
64
+ ) -> bool:
65
+ """Update scraping session status and results."""
66
+ from app.db_models import ScrapingSession
67
+
68
+ try:
69
+ stmt = (
70
+ update(ScrapingSession)
71
+ .where(ScrapingSession.id == session_id)
72
+ .values(
73
+ status=status,
74
+ proxies_found=proxies_found,
75
+ proxies_valid=proxies_valid,
76
+ error_message=error_message,
77
+ metadata=metadata or {},
78
+ finished_at=datetime.utcnow()
79
+ if status in ["completed", "failed"]
80
+ else None,
81
+ )
82
+ )
83
+
84
+ await session.execute(stmt)
85
+ await session.commit()
86
+ return True
87
+
88
+ except Exception as e:
89
+ logger.error(f"Error updating scraping session {session_id}: {e}")
90
+ await session.rollback()
91
+ return False
92
+
93
+ async def get_scraping_sessions(
94
+ self,
95
+ session: AsyncSession,
96
+ source_id: Optional[int] = None,
97
+ status: Optional[str] = None,
98
+ limit: int = 50,
99
+ offset: int = 0,
100
+ ) -> Tuple[List[Dict], int]:
101
+ """Get scraping sessions with filtering."""
102
+ from app.db_models import ScrapingSession
103
+
104
+ query = select(ScrapingSession).options(selectinload(ScrapingSession.source))
105
+
106
+ if source_id:
107
+ query = query.where(ScrapingSession.source_id == source_id)
108
+ if status:
109
+ query = query.where(ScrapingSession.status == status)
110
+
111
+ count_query = select(func.count()).select_from(query.subquery())
112
+ total_result = await session.execute(count_query)
113
+ total = total_result.scalar()
114
+
115
+ query = (
116
+ query.order_by(desc(ScrapingSession.started_at)).limit(limit).offset(offset)
117
+ )
118
+ result = await session.execute(query)
119
+ sessions = result.scalars().all()
120
+
121
+ sessions_data = []
122
+ for s in sessions:
123
+ sessions_data.append(
124
+ {
125
+ "id": s.id,
126
+ "source_id": s.source_id,
127
+ "source_name": s.source.name if s.source else "Unknown",
128
+ "scraping_type": s.scraping_type,
129
+ "status": s.status,
130
+ "proxies_found": s.proxies_found,
131
+ "proxies_valid": s.proxies_valid,
132
+ "started_at": s.started_at,
133
+ "finished_at": s.finished_at,
134
+ "error_message": s.error_message,
135
+ "config": s.config,
136
+ }
137
+ )
138
+
139
+ return sessions_data, total
140
+
141
+ async def get_scraping_stats(
142
+ self,
143
+ session: AsyncSession,
144
+ days: int = 7,
145
+ ) -> Dict:
146
+ """Get scraping statistics for the last N days."""
147
+ from app.db_models import ScrapingSession
148
+
149
+ since_date = datetime.utcnow() - timedelta(days=days)
150
+
151
+ status_stats = await session.execute(
152
+ select(
153
+ ScrapingSession.status,
154
+ func.count(ScrapingSession.id).label("count"),
155
+ )
156
+ .where(ScrapingSession.started_at >= since_date)
157
+ .group_by(ScrapingSession.status)
158
+ )
159
+
160
+ status_counts = {}
161
+ for row in status_stats:
162
+ status_counts[row.status] = row.count
163
+
164
+ daily_stats = await session.execute(
165
+ select(
166
+ func.date(ScrapingSession.started_at).label("date"),
167
+ func.count(ScrapingSession.id).label("sessions"),
168
+ func.sum(ScrapingSession.proxies_found).label("proxies_found"),
169
+ func.sum(ScrapingSession.proxies_valid).label("proxies_valid"),
170
+ )
171
+ .where(ScrapingSession.started_at >= since_date)
172
+ .group_by(func.date(ScrapingSession.started_at))
173
+ .order_by(desc(func.date(ScrapingSession.started_at)))
174
+ )
175
+
176
+ daily_data = []
177
+ for row in daily_stats:
178
+ daily_data.append(
179
+ {
180
+ "date": row.date.isoformat(),
181
+ "sessions": row.sessions or 0,
182
+ "proxies_found": row.proxies_found or 0,
183
+ "proxies_valid": row.proxies_valid or 0,
184
+ }
185
+ )
186
+
187
+ total_completed = status_counts.get("completed", 0) + status_counts.get(
188
+ "failed", 0
189
+ )
190
+ success_rate = (
191
+ (status_counts.get("completed", 0) / total_completed * 100)
192
+ if total_completed > 0
193
+ else 0
194
+ )
195
+
196
+ return {
197
+ "period_days": days,
198
+ "status_counts": status_counts,
199
+ "daily_stats": daily_data,
200
+ "success_rate": round(success_rate, 2),
201
+ "total_sessions": sum(status_counts.values()),
202
+ }
203
+
204
+ async def create_hunter_candidate(
205
+ self,
206
+ session: AsyncSession,
207
+ url: str,
208
+ discovery_method: str,
209
+ confidence_score: int = 0,
210
+ metadata: Optional[Dict] = None,
211
+ ) -> Dict:
212
+ """Create a new Hunter candidate source."""
213
+ from urllib.parse import urlparse
214
+
215
+ domain = urlparse(url).netloc
216
+
217
+ existing = await session.execute(
218
+ select(CandidateSource).where(CandidateSource.url == url)
219
+ )
220
+ if existing.scalar_one_or_none():
221
+ return {"error": "Candidate already exists"}
222
+
223
+ candidate = CandidateSource(
224
+ url=url,
225
+ domain=domain,
226
+ discovery_method=discovery_method,
227
+ confidence_score=confidence_score,
228
+ metadata=metadata or {},
229
+ )
230
+
231
+ session.add(candidate)
232
+ await session.commit()
233
+ await session.refresh(candidate)
234
+
235
+ return {
236
+ "id": candidate.id,
237
+ "url": candidate.url,
238
+ "domain": candidate.domain,
239
+ "discovery_method": candidate.discovery_method,
240
+ "confidence_score": candidate.confidence_score,
241
+ "status": candidate.status,
242
+ }
243
+
244
+ async def update_hunter_candidate(
245
+ self,
246
+ session: AsyncSession,
247
+ candidate_id: int,
248
+ status: Optional[str] = None,
249
+ confidence_score: Optional[int] = None,
250
+ proxies_found: Optional[int] = None,
251
+ fail_count: Optional[int] = None,
252
+ ) -> bool:
253
+ """Update Hunter candidate status and metrics."""
254
+ try:
255
+ update_data = {}
256
+ if status is not None:
257
+ update_data["status"] = status
258
+ if status in ["approved", "rejected"]:
259
+ update_data["last_checked_at"] = datetime.utcnow()
260
+ if confidence_score is not None:
261
+ update_data["confidence_score"] = confidence_score
262
+ if proxies_found is not None:
263
+ update_data["proxies_found_count"] = proxies_found
264
+ if fail_count is not None:
265
+ update_data["fail_count"] = fail_count
266
+
267
+ if not update_data:
268
+ return False
269
+
270
+ stmt = (
271
+ update(CandidateSource)
272
+ .where(CandidateSource.id == candidate_id)
273
+ .values(**update_data)
274
+ )
275
+
276
+ await session.execute(stmt)
277
+ await session.commit()
278
+ return True
279
+
280
+ except Exception as e:
281
+ logger.error(f"Error updating hunter candidate {candidate_id}: {e}")
282
+ await session.rollback()
283
+ return False
284
+
285
+ async def get_hunter_statistics(
286
+ self,
287
+ session: AsyncSession,
288
+ days: int = 30,
289
+ ) -> Dict:
290
+ """Get Hunter protocol statistics and performance metrics."""
291
+ since_date = datetime.utcnow() - timedelta(days=days)
292
+
293
+ status_stats = await session.execute(
294
+ select(
295
+ CandidateSource.status,
296
+ func.count(CandidateSource.id).label("count"),
297
+ func.avg(CandidateSource.confidence_score).label("avg_confidence"),
298
+ )
299
+ .where(CandidateSource.created_at >= since_date)
300
+ .group_by(CandidateSource.status)
301
+ )
302
+
303
+ status_data = {}
304
+ for row in status_stats:
305
+ status_data[row.status] = {
306
+ "count": row.count,
307
+ "avg_confidence": round(row.avg_confidence or 0, 2),
308
+ }
309
+
310
+ method_stats = await session.execute(
311
+ select(
312
+ CandidateSource.discovery_method,
313
+ func.count(CandidateSource.id).label("count"),
314
+ func.avg(CandidateSource.proxies_found_count).label("avg_proxies"),
315
+ )
316
+ .where(CandidateSource.created_at >= since_date)
317
+ .group_by(CandidateSource.discovery_method)
318
+ )
319
+
320
+ method_data = {}
321
+ for row in method_stats:
322
+ method_data[row.discovery_method] = {
323
+ "count": row.count,
324
+ "avg_proxies": round(row.avg_proxies or 0, 2),
325
+ }
326
+
327
+ top_domains = await session.execute(
328
+ select(
329
+ CandidateSource.domain,
330
+ func.count(CandidateSource.id).label("candidates"),
331
+ func.sum(CandidateSource.proxies_found_count).label("total_proxies"),
332
+ )
333
+ .where(CandidateSource.created_at >= since_date)
334
+ .group_by(CandidateSource.domain)
335
+ .having(func.count(CandidateSource.id) >= 2)
336
+ .order_by(desc(func.sum(CandidateSource.proxies_found_count)))
337
+ .limit(10)
338
+ )
339
+
340
+ domain_data = []
341
+ for row in top_domains:
342
+ domain_data.append(
343
+ {
344
+ "domain": row.domain,
345
+ "candidates": row.candidates,
346
+ "total_proxies": row.total_proxies or 0,
347
+ "avg_proxies_per_candidate": round(
348
+ (row.total_proxies or 0) / row.candidates, 2
349
+ ),
350
+ }
351
+ )
352
+
353
+ total_processed = sum(
354
+ data["count"]
355
+ for status, data in status_data.items()
356
+ if status in ["approved", "rejected"]
357
+ )
358
+ approved_count = status_data.get("approved", {}).get("count", 0)
359
+ approval_rate = (
360
+ (approved_count / total_processed * 100) if total_processed > 0 else 0
361
+ )
362
+
363
+ return {
364
+ "period_days": days,
365
+ "status_stats": status_data,
366
+ "method_stats": method_data,
367
+ "top_domains": domain_data,
368
+ "approval_rate": round(approval_rate, 2),
369
+ "total_candidates": sum(data["count"] for data in status_data.values()),
370
+ }
371
+
372
+ async def batch_process_proxies_with_rate_limit(
373
+ self,
374
+ session: AsyncSession,
375
+ proxies_data: List[Dict],
376
+ source_id: Optional[int] = None,
377
+ rate_limit_per_second: int = 10,
378
+ batch_size: int = 50,
379
+ ) -> Dict:
380
+ """Process proxies with rate limiting and detailed progress tracking."""
381
+ import asyncio
382
+ import time
383
+
384
+ if not proxies_data:
385
+ return {"processed": 0, "success": 0, "failed": 0, "errors": []}
386
+
387
+ processed = 0
388
+ success = 0
389
+ failed = 0
390
+ errors = []
391
+ start_time = time.time()
392
+
393
+ for i in range(0, len(proxies_data), batch_size):
394
+ batch = proxies_data[i : i + batch_size]
395
+ batch_start = time.time()
396
+
397
+ for proxy_data in batch:
398
+ try:
399
+ result = await self.add_proxy_with_validation(
400
+ session, proxy_data, source_id
401
+ )
402
+ if result:
403
+ success += 1
404
+ else:
405
+ failed += 1
406
+ errors.append(
407
+ f"Failed to add proxy: {proxy_data.get('url', 'unknown')}"
408
+ )
409
+
410
+ processed += 1
411
+
412
+ await asyncio.sleep(1.0 / rate_limit_per_second)
413
+
414
+ except Exception as e:
415
+ failed += 1
416
+ errors.append(f"Error processing proxy: {str(e)}")
417
+ processed += 1
418
+
419
+ batch_time = time.time() - batch_start
420
+ expected_batch_time = len(batch) / rate_limit_per_second
421
+ if batch_time < expected_batch_time:
422
+ await asyncio.sleep(expected_batch_time - batch_time)
423
+
424
+ total_time = time.time() - start_time
425
+ actual_rate = processed / total_time if total_time > 0 else 0
426
+
427
+ return {
428
+ "processed": processed,
429
+ "success": success,
430
+ "failed": failed,
431
+ "errors": errors[:10],
432
+ "total_time_seconds": round(total_time, 2),
433
+ "actual_rate_per_second": round(actual_rate, 2),
434
+ "target_rate_per_second": rate_limit_per_second,
435
+ }
436
+
437
+ async def get_performance_metrics(
438
+ self,
439
+ session: AsyncSession,
440
+ days: int = 7,
441
+ ) -> Dict:
442
+ """Get comprehensive performance metrics."""
443
+ since_date = datetime.utcnow() - timedelta(days=days)
444
+
445
+ validation_metrics = await session.execute(
446
+ select(
447
+ func.avg(Proxy.latency_ms).label("avg_latency"),
448
+ func.min(Proxy.latency_ms).label("min_latency"),
449
+ func.max(Proxy.latency_ms).label("max_latency"),
450
+ func.count(Proxy.id).label("total_validated"),
451
+ )
452
+ .where(Proxy.validation_status == "validated")
453
+ .where(Proxy.last_validated >= since_date)
454
+ )
455
+
456
+ val_row = validation_metrics.first()
457
+
458
+ quality_dist = await session.execute(
459
+ select(
460
+ Proxy.quality_score,
461
+ func.count(Proxy.id).label("count"),
462
+ )
463
+ .where(Proxy.validation_status == "validated")
464
+ .where(Proxy.last_validated >= since_date)
465
+ .group_by(Proxy.quality_score)
466
+ .order_by(Proxy.quality_score)
467
+ )
468
+
469
+ quality_buckets = {"0-20": 0, "21-40": 0, "41-60": 0, "61-80": 0, "81-100": 0}
470
+ for row in quality_dist:
471
+ score = row.quality_score or 0
472
+ if score <= 20:
473
+ quality_buckets["0-20"] += row.count
474
+ elif score <= 40:
475
+ quality_buckets["21-40"] += row.count
476
+ elif score <= 60:
477
+ quality_buckets["41-60"] += row.count
478
+ elif score <= 80:
479
+ quality_buckets["61-80"] += row.count
480
+ else:
481
+ quality_buckets["81-100"] += row.count
482
+
483
+ protocol_dist = await session.execute(
484
+ select(
485
+ Proxy.protocol,
486
+ func.count(Proxy.id).label("count"),
487
+ )
488
+ .where(Proxy.validation_status == "validated")
489
+ .where(Proxy.last_validated >= since_date)
490
+ .group_by(Proxy.protocol)
491
+ .order_by(desc(func.count(Proxy.id)))
492
+ )
493
+
494
+ protocol_data = {}
495
+ for row in protocol_dist:
496
+ protocol_data[row.protocol or "unknown"] = row.count
497
+
498
+ country_dist = await session.execute(
499
+ select(
500
+ Proxy.country_code,
501
+ func.count(Proxy.id).label("count"),
502
+ )
503
+ .where(Proxy.validation_status == "validated")
504
+ .where(Proxy.last_validated >= since_date)
505
+ .where(Proxy.country_code.isnot(None))
506
+ .group_by(Proxy.country_code)
507
+ .order_by(desc(func.count(Proxy.id)))
508
+ .limit(10)
509
+ )
510
+
511
+ country_data = []
512
+ for row in country_dist:
513
+ country_data.append(
514
+ {
515
+ "country_code": row.country_code,
516
+ "count": row.count,
517
+ }
518
+ )
519
+
520
+ return {
521
+ "period_days": days,
522
+ "validation_performance": {
523
+ "avg_latency_ms": round(val_row.avg_latency or 0, 2),
524
+ "min_latency_ms": val_row.min_latency or 0,
525
+ "max_latency_ms": val_row.max_latency or 0,
526
+ "total_validated": val_row.total_validated or 0,
527
+ },
528
+ "quality_distribution": quality_buckets,
529
+ "protocol_distribution": protocol_data,
530
+ "top_countries": country_data,
531
+ }
532
+
533
+ async def create_background_task(
534
+ self,
535
+ session: AsyncSession,
536
+ task_type: str,
537
+ task_data: Dict,
538
+ scheduled_for: Optional[datetime] = None,
539
+ retry_count: int = 0,
540
+ max_retries: int = 3,
541
+ ) -> Dict:
542
+ """Create a background task record."""
543
+ from app.db_models import BackgroundTask
544
+
545
+ task = BackgroundTask(
546
+ task_type=task_type,
547
+ task_data=task_data,
548
+ status="pending",
549
+ scheduled_for=scheduled_for or datetime.utcnow(),
550
+ retry_count=retry_count,
551
+ max_retries=max_retries,
552
+ )
553
+
554
+ session.add(task)
555
+ await session.commit()
556
+ await session.refresh(task)
557
+
558
+ return {
559
+ "task_id": task.id,
560
+ "task_type": task.task_type,
561
+ "status": task.status,
562
+ "scheduled_for": task.scheduled_for,
563
+ }
564
+
565
+ async def update_background_task(
566
+ self,
567
+ session: AsyncSession,
568
+ task_id: int,
569
+ status: str,
570
+ result: Optional[Dict] = None,
571
+ error_message: Optional[str] = None,
572
+ ) -> bool:
573
+ """Update background task status and results."""
574
+ from app.db_models import BackgroundTask
575
+
576
+ try:
577
+ task = await session.execute(
578
+ select(BackgroundTask).where(BackgroundTask.id == task_id)
579
+ )
580
+ task_obj = task.scalar_one_or_none()
581
+
582
+ if not task_obj:
583
+ return False
584
+
585
+ update_data = {
586
+ "status": status,
587
+ "completed_at": datetime.utcnow()
588
+ if status in ["completed", "failed"]
589
+ else None,
590
+ }
591
+
592
+ if result:
593
+ update_data["result"] = result
594
+ if error_message:
595
+ update_data["error_message"] = error_message
596
+ update_data["retry_count"] = task_obj.retry_count + 1
597
+
598
+ stmt = (
599
+ update(BackgroundTask)
600
+ .where(BackgroundTask.id == task_id)
601
+ .values(**update_data)
602
+ )
603
+
604
+ await session.execute(stmt)
605
+ await session.commit()
606
+ return True
607
+
608
+ except Exception as e:
609
+ logger.error(f"Error updating background task {task_id}: {e}")
610
+ await session.rollback()
611
+ return False
612
+
613
+ async def get_pending_background_tasks(
614
+ self,
615
+ session: AsyncSession,
616
+ task_type: Optional[str] = None,
617
+ limit: int = 50,
618
+ ) -> List[Dict]:
619
+ """Get pending background tasks for execution."""
620
+ from app.db_models import BackgroundTask
621
+
622
+ query = select(BackgroundTask).where(
623
+ and_(
624
+ BackgroundTask.status == "pending",
625
+ BackgroundTask.scheduled_for <= datetime.utcnow(),
626
+ )
627
+ )
628
+
629
+ if task_type:
630
+ query = query.where(BackgroundTask.task_type == task_type)
631
+
632
+ query = query.order_by(BackgroundTask.scheduled_for).limit(limit)
633
+ result = await session.execute(query)
634
+ tasks = result.scalars().all()
635
+
636
+ tasks_data = []
637
+ for task in tasks:
638
+ tasks_data.append(
639
+ {
640
+ "id": task.id,
641
+ "task_type": task.task_type,
642
+ "task_data": task.task_data,
643
+ "scheduled_for": task.scheduled_for,
644
+ "retry_count": task.retry_count,
645
+ "max_retries": task.max_retries,
646
+ }
647
+ )
648
+
649
+ return tasks_data
650
+
651
+
652
+ extended_db_storage = ExtendedDatabaseStorage()