Spaces:
Build error
Build error
| ; | |
| var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { | |
| var c = arguments.length, r = c < 3 ? target : desc === null ? desc = Object.getOwnPropertyDescriptor(target, key) : desc, d; | |
| if (typeof Reflect === "object" && typeof Reflect.decorate === "function") r = Reflect.decorate(decorators, target, key, desc); | |
| else for (var i = decorators.length - 1; i >= 0; i--) if (d = decorators[i]) r = (c < 3 ? d(r) : c > 3 ? d(target, key, r) : d(target, key)) || r; | |
| return c > 3 && r && Object.defineProperty(target, key, r), r; | |
| }; | |
| var __metadata = (this && this.__metadata) || function (k, v) { | |
| if (typeof Reflect === "object" && typeof Reflect.metadata === "function") return Reflect.metadata(k, v); | |
| }; | |
| var __param = (this && this.__param) || function (paramIndex, decorator) { | |
| return function (target, key) { decorator(target, key, paramIndex); } | |
| }; | |
| var __importDefault = (this && this.__importDefault) || function (mod) { | |
| return (mod && mod.__esModule) ? mod : { "default": mod }; | |
| }; | |
| var _a, _b, _c; | |
| Object.defineProperty(exports, "__esModule", { value: true }); | |
| exports.DataCrunchingHost = void 0; | |
| const civkit_1 = require("civkit"); | |
| const tsyringe_1 = require("tsyringe"); | |
| const shared_1 = require("../shared"); | |
| const lodash_1 = __importDefault(require("lodash")); | |
| const crawler_1 = require("../api/crawler"); | |
| const crawled_1 = require("../db/crawled"); | |
| const dayjs_1 = __importDefault(require("dayjs")); | |
| const fs_1 = require("fs"); | |
| const promises_1 = require("fs/promises"); | |
| const zlib_1 = require("zlib"); | |
| const functions_1 = require("firebase-admin/functions"); | |
| const snapshot_formatter_1 = require("../services/snapshot-formatter"); | |
| const get_function_url_1 = require("../utils/get-function-url"); | |
| dayjs_1.default.extend(require('dayjs/plugin/utc')); | |
| let DataCrunchingHost = class DataCrunchingHost extends civkit_1.RPCHost { | |
| constructor(globalLogger, crawler, snapshotFormatter, tempFileManager, firebaseObjectStorage) { | |
| super(...lodash_1.default.without(arguments, crawler)); | |
| this.globalLogger = globalLogger; | |
| this.crawler = crawler; | |
| this.snapshotFormatter = snapshotFormatter; | |
| this.tempFileManager = tempFileManager; | |
| this.firebaseObjectStorage = firebaseObjectStorage; | |
| this.logger = this.globalLogger.child({ service: this.constructor.name }); | |
| this.pageCacheCrunchingPrefix = 'crunched-pages'; | |
| this.pageCacheCrunchingBatchSize = 5000; | |
| this.pageCacheCrunchingTMinus = 6 * 24 * 60 * 60 * 1000; | |
| this.rev = 7; | |
| } | |
| async init() { | |
| await this.dependencyReady(); | |
| this.emit('ready'); | |
| } | |
| // @CloudTaskV2({ | |
| // runtime: { | |
| // cpu: 2, | |
| // memory: '4GiB', | |
| // timeoutSeconds: 3600, | |
| // concurrency: 2, | |
| // maxInstances: 200, | |
| // retryConfig: { | |
| // maxAttempts: 3, | |
| // minBackoffSeconds: 60, | |
| // }, | |
| // rateLimits: { | |
| // maxConcurrentDispatches: 150, | |
| // maxDispatchesPerSecond: 2, | |
| // }, | |
| // }, | |
| // tags: ['DataCrunching'], | |
| // }) | |
| async crunchPageCacheWorker(date, offset) { | |
| this.logger.info(`Crunching page cache @${date}+${offset}...`); | |
| for await (const { fileName, records } of this.iterPageCacheRecords(date, offset)) { | |
| this.logger.info(`Crunching ${fileName}...`); | |
| const fileOnDrive = await this.crunchCacheRecords(records); | |
| const fstream = (0, fs_1.createReadStream)(fileOnDrive.path); | |
| const gzipStream = (0, zlib_1.createGzip)(); | |
| fstream.pipe(gzipStream, { end: true }); | |
| await this.firebaseObjectStorage.bucket.file(fileName).save(gzipStream, { | |
| contentType: 'application/jsonl+gzip', | |
| }); | |
| } | |
| this.logger.info(`Crunching page cache @${date}+${offset} done.`); | |
| return true; | |
| } | |
| // @CloudScheduleV2('2 0 * * *', { | |
| // name: 'crunchPageCacheEveryday', | |
| // runtime: { | |
| // cpu: 2, | |
| // memory: '4GiB', | |
| // timeoutSeconds: 1800, | |
| // timeZone: 'UTC', | |
| // retryCount: 3, | |
| // minBackoffSeconds: 60, | |
| // }, | |
| // tags: ['DataCrunching'], | |
| // }) | |
| async dispatchPageCacheCrunching() { | |
| for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { | |
| this.logger.info(`Dispatching ${fileName}...`); | |
| // sse.write({ data: `Dispatching ${fileName}...` }); | |
| await (0, functions_1.getFunctions)().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { | |
| dispatchDeadlineSeconds: 1800, | |
| uri: await (0, get_function_url_1.getFunctionUrl)('crunchPageCacheWorker'), | |
| }); | |
| } | |
| return true; | |
| } | |
| // @CloudHTTPv2({ | |
| // runtime: { | |
| // cpu: 2, | |
| // memory: '4GiB', | |
| // timeoutSeconds: 3600, | |
| // concurrency: 2, | |
| // maxInstances: 200, | |
| // }, | |
| // tags: ['DataCrunching'], | |
| // }) | |
| // async dispatchPageCacheCrunching( | |
| // @RPCReflect() rpcReflect: RPCReflection | |
| // ) { | |
| // const sse = new OutputServerEventStream({ highWaterMark: 4096 }); | |
| // rpcReflect.return(sse); | |
| // rpcReflect.catch((err) => { | |
| // sse.end({ data: `Error: ${err.message}` }); | |
| // }); | |
| // for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { | |
| // this.logger.info(`Dispatching ${fileName}...`); | |
| // sse.write({ data: `Dispatching ${fileName}...` }); | |
| // await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { | |
| // dispatchDeadlineSeconds: 1800, | |
| // uri: await getFunctionUrl('crunchPageCacheWorker'), | |
| // }); | |
| // } | |
| // sse.end({ data: 'done' }); | |
| // return true; | |
| // } | |
| async *iterPageCacheRecords(date, inputOffset) { | |
| const startOfToday = (0, dayjs_1.default)().utc().startOf('day'); | |
| const startingPoint = (0, dayjs_1.default)().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day'); | |
| let theDay = startingPoint; | |
| if (date) { | |
| theDay = (0, dayjs_1.default)(date).utc().startOf('day'); | |
| } | |
| let counter = 0; | |
| if (inputOffset) { | |
| counter = parseInt(inputOffset, 10); | |
| } | |
| while (theDay.isBefore(startOfToday)) { | |
| const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`; | |
| const offset = counter; | |
| counter += this.pageCacheCrunchingBatchSize; | |
| const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0]; | |
| if (fileExists) { | |
| continue; | |
| } | |
| const records = await crawled_1.Crawled.fromFirestoreQuery(crawled_1.Crawled.COLLECTION | |
| .where('createdAt', '>=', theDay.toDate()) | |
| .where('createdAt', '<', theDay.add(1, 'day').toDate()) | |
| .orderBy('createdAt', 'asc') | |
| .offset(offset) | |
| .limit(this.pageCacheCrunchingBatchSize)); | |
| this.logger.info(`Found ${records.length} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter }); | |
| if (!records.length) { | |
| if (date) { | |
| break; | |
| } | |
| theDay = theDay.add(1, 'day'); | |
| counter = 0; | |
| continue; | |
| } | |
| yield { fileName, records }; | |
| if (offset) { | |
| break; | |
| } | |
| } | |
| } | |
| async *iterPageCacheChunks() { | |
| const startOfToday = (0, dayjs_1.default)().utc().startOf('day'); | |
| const startingPoint = (0, dayjs_1.default)().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day'); | |
| let theDay = startingPoint; | |
| let counter = 0; | |
| while (theDay.isBefore(startOfToday)) { | |
| const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`; | |
| const offset = counter; | |
| counter += this.pageCacheCrunchingBatchSize; | |
| const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0]; | |
| if (fileExists) { | |
| continue; | |
| } | |
| const nRecords = (await crawled_1.Crawled.COLLECTION | |
| .where('createdAt', '>=', theDay.toDate()) | |
| .where('createdAt', '<', theDay.add(1, 'day').toDate()) | |
| .orderBy('createdAt', 'asc') | |
| .offset(offset) | |
| .limit(this.pageCacheCrunchingBatchSize) | |
| .count().get()).data().count; | |
| this.logger.info(`Found ${nRecords} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter }); | |
| if (nRecords < this.pageCacheCrunchingBatchSize) { | |
| theDay = theDay.add(1, 'day'); | |
| counter = 0; | |
| } | |
| if (nRecords) { | |
| yield { fileName, date: theDay.toISOString(), offset }; | |
| } | |
| } | |
| } | |
| async crunchCacheRecords(records) { | |
| const throttle = new civkit_1.PromiseThrottle(30); | |
| const localFilePath = this.tempFileManager.alloc(); | |
| let nextDrainDeferred = (0, civkit_1.Defer)(); | |
| nextDrainDeferred.resolve(); | |
| for (const record of records) { | |
| await throttle.acquire(); | |
| this.firebaseObjectStorage.downloadFile(`snapshots/${record._id}`) | |
| .then(async (snapshotTxt) => { | |
| try { | |
| const snapshot = JSON.parse(snapshotTxt.toString('utf-8')); | |
| let formatted = await this.snapshotFormatter.formatSnapshot('default', snapshot); | |
| if (!formatted.content) { | |
| formatted = await this.snapshotFormatter.formatSnapshot('markdown', snapshot); | |
| } | |
| await nextDrainDeferred.promise; | |
| await (0, promises_1.appendFile)(localFilePath, JSON.stringify({ | |
| url: snapshot.href, | |
| title: snapshot.title || '', | |
| html: snapshot.html || '', | |
| text: snapshot.text || '', | |
| content: formatted.content || '', | |
| }) + '\n', { encoding: 'utf-8' }); | |
| } | |
| catch (err) { | |
| this.logger.warn(`Failed to parse snapshot for ${record._id}`, { err }); | |
| } | |
| }) | |
| .finally(() => { | |
| throttle.release(); | |
| }); | |
| } | |
| await throttle.nextDrain(); | |
| const ro = { | |
| path: localFilePath | |
| }; | |
| this.tempFileManager.bindPathTo(ro, localFilePath); | |
| return ro; | |
| } | |
| }; | |
| exports.DataCrunchingHost = DataCrunchingHost; | |
| __decorate([ | |
| __param(0, (0, shared_1.Param)('date')), | |
| __param(1, (0, shared_1.Param)('offset', { default: 0 })), | |
| __metadata("design:type", Function), | |
| __metadata("design:paramtypes", [String, Number]), | |
| __metadata("design:returntype", Promise) | |
| ], DataCrunchingHost.prototype, "crunchPageCacheWorker", null); | |
| exports.DataCrunchingHost = DataCrunchingHost = __decorate([ | |
| (0, tsyringe_1.singleton)(), | |
| __metadata("design:paramtypes", [typeof (_a = typeof shared_1.Logger !== "undefined" && shared_1.Logger) === "function" ? _a : Object, crawler_1.CrawlerHost, | |
| snapshot_formatter_1.SnapshotFormatter, typeof (_b = typeof shared_1.TempFileManager !== "undefined" && shared_1.TempFileManager) === "function" ? _b : Object, typeof (_c = typeof shared_1.FirebaseStorageBucketControl !== "undefined" && shared_1.FirebaseStorageBucketControl) === "function" ? _c : Object]) | |
| ], DataCrunchingHost); | |
| //# sourceMappingURL=data-crunching.js.map |