File size: 2,118 Bytes
23a3b80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import 'reflect-metadata';

import { singleton, container } from 'tsyringe';
import { AbstractThreadedServiceRegistry } from 'civkit/threaded';
import _ from 'lodash';

import { GlobalLogger } from './logger';
import { AsyncLocalContext } from './async-context';
import { PseudoTransfer } from './pseudo-transfer';
import { cpus } from 'os';
import { isMainThread } from 'worker_threads';

@singleton()
export class ThreadedServiceRegistry extends AbstractThreadedServiceRegistry {
    container = container;

    logger = this.globalLogger.child({ service: this.constructor.name });

    constructor(
        protected globalLogger: GlobalLogger,
        public asyncContext: AsyncLocalContext,
        public pseudoTransfer: PseudoTransfer,
    ) {
        super(...arguments);
    }

    setMaxWorkersByCpu() {
        const cpuStat = cpus();

        const evenCpuCycles = cpuStat.filter((_cpu, i) => i % 2 === 0).reduce((acc, cpu) => acc + cpu.times.user + cpu.times.sys, 0);
        const oddCpuCycles = cpuStat.filter((_cpu, i) => i % 2 === 1).reduce((acc, cpu) => acc + cpu.times.user + cpu.times.sys, 0);

        const isLikelyHyperThreaded = (oddCpuCycles / evenCpuCycles) < 0.5;

        this.maxWorkers = isLikelyHyperThreaded ? cpuStat.length / 2 : cpuStat.length;
    }

    override async init() {
        await this.dependencyReady();
        await super.init();

        if (isMainThread) {
            this.setMaxWorkersByCpu();
            await Promise.all(
                _.range(0, 2).map(
                    (_n) =>
                        new Promise<void>(
                            (resolve, reject) => {
                                this.createWorker()
                                    .once('message', resolve)
                                    .once('error', reject);
                            }
                        )
                )
            );
        }

        this.emit('ready');
    }

}


const instance = container.resolve(ThreadedServiceRegistry);
export default instance;
export const { Method, Param, Ctx, RPCReflect, Threaded } = instance.decorators();