paijo77 commited on
Commit
520f36d
·
verified ·
1 Parent(s): 91310c5

update app/admin/scraping_scheduling.py

Browse files
Files changed (1) hide show
  1. app/admin/scraping_scheduling.py +378 -0
app/admin/scraping_scheduling.py ADDED
@@ -0,0 +1,378 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Advanced Scheduling untuk Enhanced Scraping
3
+
4
+ Implementasi jadwal otomatis untuk scraping otomatis
5
+ dengan kontrol waktu, retry logic, dan resource optimization.
6
+ """
7
+
8
+ import asyncio
9
+ import logging
10
+ from typing import Dict, List, Any, Optional
11
+ from datetime import datetime, time, timedelta
12
+ from pydantic import BaseModel, Field, validator
13
+ from enum import Enum
14
+
15
+ from app.database import get_db
16
+ from app.dependencies import require_admin
17
+ from app.db_storage import db_storage
18
+
19
+
20
+ logger = logging.getLogger(__name__)
21
+
22
+
23
+ class ScheduleType(str, Enum):
24
+ """Jenis jadwal scraping"""
25
+ HOURLY = "hourly"
26
+ DAILY = "daily"
27
+ WEEKLY = "weekly"
28
+ MONTHLY = "monthly"
29
+ CUSTOM = "custom"
30
+ RUN_NOW = "run_now"
31
+
32
+
33
+ class ScheduleEntry(BaseModel):
34
+ """Entry jadwal scraping"""
35
+ hour: int = Field(None, ge=0, le=23, description="Jam (0-23)")
36
+ minute: int = Field(None, ge=0, le=59, description="Menit (0-59)")
37
+ cron_expression: str = Field(None, description="Cron expression")
38
+ timezone: str = Field("UTC", description="Timezone")
39
+ enabled: bool = Field(True, description="Apakah jadwal aktif?")
40
+ weekdays: List[str] = Field(default=["mon", "tue", "wed", "thu", "fri", "sat", "sun"], description="Hari aktif (default: Senin-Sabtu)")
41
+ max_concurrent_tasks: int = Field(5, description="Maksimal concurrent tasks")
42
+
43
+
44
+ class ScrapingSchedule(BaseModel):
45
+ """Konfigurasi jadwal scraping"""
46
+ enabled: bool = Field(False, description="Enable scheduling")
47
+ default_timezone: str = Field("UTC", description="Default timezone")
48
+ entries: List[ScheduleEntry] = Field(default_factory=list)
49
+
50
+ def validate(self) -> List[str]:
51
+ """Validasi jadwal"""
52
+ errors = []
53
+
54
+ if not self.enabled:
55
+ return ["Scheduling tidak diaktifkan"]
56
+
57
+ for entry in self.entries:
58
+ if entry.cron_expression and not self._validate_cron_expression(entry.cron_expression):
59
+ errors.append(f"Invalid cron expression: {entry.cron_expression}")
60
+
61
+ if entry.hour and (entry.hour < 0 or entry.hour > 23):
62
+ errors.append(f"Invalid hour: {entry.hour}. Harus 0-23")
63
+
64
+ if entry.minute and (entry.minute < 0 or entry.minute > 59):
65
+ errors.append(f"Invalid minute: {entry.minute}. Harus 0-59")
66
+
67
+ if entry.weekdays and not all(day in entry.weekdays for day in entry.weekdays):
68
+ errors.append(f"Invalid weekday: {entry.weekdays}. Harus: {entry.weekdays}")
69
+
70
+ if entry.timezone and entry.timezone != "UTC":
71
+ try:
72
+ import pytz
73
+ pytz.timezone(entry.timezone)
74
+ except Exception:
75
+ errors.append(f"Invalid timezone: {entry.timezone}")
76
+
77
+ return errors if errors else None
78
+
79
+
80
+ class ScheduledJob:
81
+ """Tugas jadwal yang dijadwalkal"""
82
+ def __init__(
83
+ self,
84
+ job_id: str,
85
+ schedule: ScrapingScheduleEntry,
86
+ source_config: Dict[str, Any],
87
+ scraper_service,
88
+ background_tasks
89
+ ):
90
+ self.job_id = job_id
91
+ self.schedule = schedule
92
+ self.source_config = source_config
93
+ self.scraper_service = scraper_service
94
+ self.background_tasks = background_tasks
95
+ self.background_job = None
96
+ self.last_run = None
97
+ self.next_run = None
98
+ self.run_count = 0
99
+ self.is_running = False
100
+
101
+ async def calculate_next_run(self) -> datetime:
102
+ """Hitung waktu eksekusi berikutnya"""
103
+ now = datetime.now()
104
+
105
+ if self.schedule.schedule_type == ScheduleType.HOURLY:
106
+ # Next run at next hour
107
+ next_run = now.replace(hour=self.schedule.hour, minute=0, second=0, microsecond=0)
108
+ while next_run <= now:
109
+ next_run += timedelta(hours=1)
110
+ elif self.schedule.schedule_type == ScheduleType.DAILY:
111
+ # Next run at same time tomorrow
112
+ next_run = (now + timedelta(days=1)).replace(hour=self.schedule.hour, minute=self.schedule.minute, second=0, microsecond=0)
113
+ elif self.schedule.schedule_type == ScheduleType.WEEKLY:
114
+ # Next run at same day next week
115
+ days_ahead = (7 - now.weekday()) % 7
116
+ next_run = (now + timedelta(days=days_ahead)).replace(hour=self.schedule.hour, minute=self.schedule.minute, second=0, microsecond=0)
117
+ elif self.schedule_type == ScheduleType.MONTHLY:
118
+ # Next run at same date next month
119
+ next_run = (now + timedelta(days=30)).replace(day=1, hour=self.schedule.hour, minute=self.schedule.minute, second=0, microsecond=0)
120
+ elif self.schedule_type == ScheduleType.RUN_NOW:
121
+ next_run = now
122
+
123
+ self.next_run = next_run
124
+ return next_run
125
+
126
+ async def is_time_to_run(self) -> bool:
127
+ """Check if sudah waktunya menjalankan tugas"""
128
+ now = datetime.now()
129
+ return self.next_run <= now
130
+
131
+ async def execute(self) -> Dict[str, Any]:
132
+ """Eksekusi tugas jadwal"""
133
+ if self.is_running:
134
+ return {"status": "already_running", "message": "Task sedang berjalan"}
135
+
136
+ try:
137
+ self.is_running = True
138
+ self.run_count += 1
139
+ self.last_run = datetime.now()
140
+
141
+ # Get scraping service for this specific job
142
+ if self.job_id == "hunter_protocol":
143
+ scraper_service = self.scraper_service
144
+ job_func = scraper_service.run_hunter
145
+ else:
146
+ scraper_service = self.scraper_service
147
+ job_func = scraper_service.run_scraping_session
148
+
149
+ # Execute scraping
150
+ result = await job_func(self.source_config)
151
+
152
+ self.background_job = result
153
+
154
+ self.is_running = False
155
+ self.last_run = datetime.now()
156
+
157
+ logger.info(f"Jadwal {self.job_id} dijalankan: {self.run_count}x ke-{self.run_count}x")
158
+
159
+ return {
160
+ "status": "success",
161
+ "result": result,
162
+ "run_count": self.run_count,
163
+ "last_run": self.last_run.isoformat()
164
+ }
165
+
166
+ except Exception as e:
167
+ self.is_running = False
168
+ logger.error(f"Error menjalankan jadwal {self.job_id}: {str(e)}")
169
+ return {
170
+ "status": "error",
171
+ "error": str(e)
172
+ }
173
+
174
+
175
+ class ScrapingScheduler:
176
+ """Manager untuk menjalankan semua tugas jadwal scraping"""
177
+
178
+ def __init__(self):
179
+ self.jobs: Dict[str, ScheduledJob] = {}
180
+ self.background_tasks = BackgroundTasks()
181
+ self.scheduler_enabled = False
182
+
183
+ def add_job(self, job_id: str, schedule: ScrapingScheduleEntry, source_config: Dict[str, Any], scraper_service) -> ScheduledJob:
184
+ """Tambah tugas jadwal baru"""
185
+ job = ScheduledJob(
186
+ job_id=job_id,
187
+ schedule=schedule,
188
+ source_config=source_config,
189
+ scraper_service=scraper_service,
190
+ background_tasks=self.background_tasks
191
+ )
192
+
193
+ self.jobs[job_id] = job
194
+ logger.info(f"Tugas {job_id} ditambahkan ke jadwal")
195
+
196
+ # Start scheduler jika diaktifkan
197
+ if not self.scheduler_enabled:
198
+ await self.start_scheduler()
199
+
200
+ return job
201
+
202
+ def remove_job(self, job_id: str) -> bool:
203
+ """Hapus tugas jadwal"""
204
+ if job_id in self.jobs:
205
+ # Hentikan tugas jika sedang berjalan
206
+ if self.jobs[job_id].is_running:
207
+ return False
208
+
209
+ # Cancel background task
210
+ if self.jobs[job_id].background_job:
211
+ self.background_tasks.cancel_task(self.jobs[job_id].background_job)
212
+
213
+ del self.jobs[job_id]
214
+
215
+ logger.info(f"Tugas {job_id} dihapus dari jadwal")
216
+ return True
217
+
218
+ return False
219
+
220
+ async def start_scheduler(self):
221
+ """Mulai penjadwalan tugas"""
222
+ if self.scheduler_enabled:
223
+ return {"status": "already_enabled"}
224
+
225
+ self.scheduler_enabled = True
226
+ logger.info("Scheduler started")
227
+
228
+ # Mulai loop pengecekan setiap 5 menit
229
+ while True:
230
+ current_time = datetime.now()
231
+
232
+ for job_id, job in list(self.jobs.items()):
233
+ if job.is_time_to_run():
234
+ task_id = self.background_tasks.add_task(
235
+ job.execute(),
236
+ name=f"scheduled-{job_id}",
237
+ tags=[job_id, "scheduler"]
238
+ )
239
+ logger.info(f"Menjalankan tugas {job_id} pada {current_time.isoformat()}")
240
+
241
+ # Tunggu sebelum cek berikutnya
242
+ await asyncio.sleep(60)
243
+
244
+ async def get_status(self) -> Dict[str, Any]:
245
+ """Dapatkan status semua tugas jadwal"""
246
+ jobs_status = {}
247
+
248
+ for job_id, job in self.jobs.items():
249
+ jobs_status[job_id] = {
250
+ "schedule_type": job.schedule.schedule_type.value if job.schedule else "manual",
251
+ "next_run": job.next_run.isoformat(),
252
+ "last_run": job.last_run.isoformat(),
253
+ "is_running": job.is_running,
254
+ "run_count": job.run_count,
255
+ "enabled": job.enabled
256
+ }
257
+
258
+ return {
259
+ "scheduler_enabled": self.scheduler_enabled,
260
+ "jobs": jobs_status,
261
+ "total_jobs": len(self.jobs)
262
+ }
263
+
264
+ async def get_job_status(self, job_id: str) -> Dict[str, Any]:
265
+ """Dapatkan status tugas spesifik"""
266
+ if job_id not in self.jobs:
267
+ return {"status": "not_found", "message": f"Job {job_id} tidak ditemukan"}
268
+
269
+ return self.jobs[job_id].get_status()
270
+
271
+ async def run_job_now(self, job_id: str) -> Dict[str, Any]:
272
+ """Jalankan tugas secara manual"""
273
+ job = self.jobs[job_id]
274
+ if job:
275
+ return await job.execute()
276
+
277
+ return {"status": "not_found", "message": f"Job {job_id} tidak ditemukan"}
278
+
279
+
280
+ # Background task startup
281
+ @router.on_event("startup")
282
+ async def startup_event():
283
+ """Inisialisasi scheduler pada startup"""
284
+ scheduler = ScrapingScheduler()
285
+ await scheduler.start_scheduler()
286
+
287
+ logger.info("Scraping scheduler initialized")
288
+
289
+
290
+ # Module extension untuk db_storage
291
+ async def create_scraping_session(
292
+ db: AsyncSession,
293
+ session_id: str,
294
+ start_time: datetime,
295
+ end_time: Optional[datetime] = None,
296
+ requests_made: int = 0,
297
+ successful_requests: int = 0,
298
+ failed_requests: int = 0,
299
+ success_rate: float = 0.0,
300
+ total_data_bytes: int = 0,
301
+ avg_response_time: float = 0.0,
302
+ proxies_used: List[str] = []
303
+ ) -> str:
304
+ """Buat sesi scraping baru di database"""
305
+
306
+ try:
307
+ session_record = await db_storage.create_scraping_session(
308
+ db=db,
309
+ session_id=session_id,
310
+ start_time=start_time,
311
+ end_time=end_time,
312
+ requests_made=requests_made,
313
+ successful_requests=successful_requests,
314
+ failed_requests=failed_requests,
315
+ success_rate=success_rate,
316
+ total_data_bytes=total_data_bytes,
317
+ proxies_used=proxies_used
318
+ )
319
+
320
+ return session_id
321
+
322
+ except Exception as e:
323
+ logger.error(f"Error creating scraping session: {str(e)}")
324
+ raise HTTPException(status_code=500, detail=f"Error creating scraping session: {str(e)}")
325
+
326
+
327
+ async def update_scraping_session(
328
+ db: AsyncSession,
329
+ session_id: str,
330
+ end_time: datetime,
331
+ requests_made: int,
332
+ successful_requests: int,
333
+ failed_requests: int,
334
+ success_rate: float,
335
+ total_data_bytes: int,
336
+ proxies_used: List[str],
337
+ ) -> str:
338
+ """Update sesi scraping yang ada"""
339
+
340
+ try:
341
+ success = await db_storage.update_scraping_session(
342
+ db=db,
343
+ session_id=session_id,
344
+ end_time=end_time,
345
+ requests_made=requests_made,
346
+ successful_requests=successful_requests,
347
+ failed_requests=failed_requests,
348
+ success_rate=success_rate,
349
+ total_data_bytes=total_data_bytes,
350
+ proxies_used=proxies_used
351
+ proxies_tested=proxies_tested
352
+ )
353
+
354
+ return f"Sesi {session_id} berhasil diupdate"
355
+
356
+ except Exception as e:
357
+ logger.error(f"Error updating scraping session: {str(e)}")
358
+ raise HTTPException(status_code=500, detail=f"Error updating scraping session: {str(e)}")
359
+
360
+
361
+ async def validate_proxy_source(
362
+ db: AsyncSession,
363
+ source_id: int,
364
+ background_tasks: BackgroundTasks,
365
+ ) -> str:
366
+ """Validasi dan test proxy source"""
367
+
368
+ try:
369
+ task_id = background_tasks.add_task(
370
+ service.validate_proxy_source(db, source_id),
371
+ name=f"validate-source-{source_id}"
372
+ )
373
+
374
+ return f"Validasi proxy source {source_id} dimulai"
375
+
376
+ except Exception as e:
377
+ logger.error(f"Error validating proxy source: {str(e)}")
378
+ raise HTTPException(status_code=500, detail=f"Error validating proxy source: {str(e)}")