storage / index.js
mrpoddaa's picture
Update index.js
1e0c95a verified
'use strict';
const express = require('express');
const { MongoClient } = require('mongodb');
const Busboy = require('busboy');
const path = require('path');
const crypto = require('crypto');
const { TelegramClient, Api } = require('telegram');
const { StringSession } = require('telegram/sessions');
const { PassThrough } = require('stream');
// ─────────────────────────────────────────────
// CONFIG
// ─────────────────────────────────────────────
const PORT = process.env.PORT || 7860;
const MONGO_URI = process.env.MONGO_URI || 'mongodb://127.0.0.1:27017/tgstore';
const CHANNEL_ID = BigInt(process.env.CHANNEL_ID || '0');
const CHUNK_SIZE = 1.9 * 1024 * 1024 * 1024; // 1.9 GB in bytes
const DL_WORKERS = parseInt(process.env.DL_WORKERS) || 4;
const BASE_URL = process.env.BASE_URL || `http://localhost:${PORT}`;
// ─────────────────────────────────────────────
// MONGODB
// ─────────────────────────────────────────────
let Sessions, Files;
async function connectMongo() {
const client = new MongoClient(MONGO_URI, {
maxPoolSize: 20,
tls: true,
tlsAllowInvalidCertificates: false,
serverSelectionTimeoutMS: 10000,
});
await client.connect();
const db = client.db();
Sessions = db.collection('sessions');
Files = db.collection('files');
await Files.createIndex({ fileId: 1 }, { unique: true });
console.log('[MongoDB] Connected');
}
// ─────────────────────────────────────────────
// SESSION POOL (round-robin)
// ─────────────────────────────────────────────
const clientPool = [];
let poolIndex = 0;
async function buildClient(apiId, apiHash, sessionString = '') {
const session = new StringSession(sessionString);
const client = new TelegramClient(session, Number(apiId), apiHash, {
connectionRetries: 5,
retryDelay: 1000,
autoReconnect: true,
maxConcurrentDownloads: DL_WORKERS,
});
await client.connect();
return client;
}
async function loadSessions() {
const docs = await Sessions.find({ active: true }).toArray();
for (const doc of docs) {
try {
const client = await buildClient(doc.apiId, doc.apiHash, doc.session);
clientPool.push(client);
console.log(`[Pool] Loaded session for ${doc.phone}`);
} catch (e) {
console.warn(`[Pool] Failed session ${doc._id}: ${e.message}`);
}
}
console.log(`[Pool] ${clientPool.length} session(s) active`);
}
function getClient() {
if (!clientPool.length)
throw new Error('No active Telegram sessions. Visit /strings to add one.');
const client = clientPool[poolIndex % clientPool.length];
poolIndex++;
return client;
}
// ─────────────────────────────────────────────
// PENDING AUTH MAP phone β†’ { client, hash }
// ─────────────────────────────────────────────
const authMap = new Map();
// ─────────────────────────────────────────────
// EXPRESS
// ─────────────────────────────────────────────
const app = express();
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
app.use(express.static(path.join(__dirname, 'public')));
// ── GET /system ──────────────────────────────
app.get('/system', (_req, res) => {
const m = process.memoryUsage();
res.json({
status: 'ok',
uptime: process.uptime(),
memory: {
rss: m.rss,
heapUsed: m.heapUsed,
heapTotal: m.heapTotal,
external: m.external,
},
sessions: clientPool.length,
node: process.version,
ts: new Date().toISOString(),
});
});
// ── GET /strings ─────────────────────────────
app.get('/strings', (_req, res) => {
res.sendFile(path.join(__dirname, 'public', 'strings.html'));
});
// ── POST /strings/send-code ──────────────────
app.post('/strings/send-code', async (req, res) => {
const { apiId, apiHash, phone } = req.body;
if (!apiId || !apiHash || !phone)
return res.status(400).json({ error: 'apiId, apiHash and phone are required.' });
try {
const client = await buildClient(apiId, apiHash, '');
const result = await client.sendCode({ apiId: Number(apiId), apiHash }, phone);
authMap.set(phone, { client, apiId, apiHash, phoneCodeHash: result.phoneCodeHash });
res.json({ ok: true, message: 'OTP sent to your Telegram app.' });
} catch (e) {
res.status(500).json({ error: e.message });
}
});
// ── POST /strings/verify ─────────────────────
app.post('/strings/verify', async (req, res) => {
const { phone, code, password } = req.body;
const entry = authMap.get(phone);
if (!entry)
return res.status(400).json({ error: 'No pending auth for this phone. Send code first.' });
const { client, apiId, apiHash, phoneCodeHash } = entry;
try {
await client.invoke(new Api.auth.SignIn({ phoneNumber: phone, phoneCodeHash, phoneCode: code }));
} catch (e) {
if (e.errorMessage === 'SESSION_PASSWORD_NEEDED') {
if (!password)
return res.status(400).json({ error: '2FA password required.', twoFA: true });
try {
const pwdInfo = await client.invoke(new Api.account.GetPassword());
const check = await require('telegram/utils/Password').computeCheck(pwdInfo, password);
await client.invoke(new Api.auth.CheckPassword({ password: check }));
} catch (e2) {
authMap.delete(phone);
return res.status(400).json({ error: e2.message });
}
} else {
authMap.delete(phone);
return res.status(400).json({ error: e.message });
}
}
const sessionString = client.session.save();
await Sessions.insertOne({
apiId, apiHash, phone,
session: sessionString,
active: true,
createdAt: new Date(),
});
clientPool.push(client);
authMap.delete(phone);
res.json({ ok: true, session: sessionString, message: 'Session saved and added to pool!' });
});
// ── POST /upload ─────────────────────────────
app.post('/upload', (req, res) => {
let client;
try { client = getClient(); }
catch (e) { return res.status(503).json({ error: e.message }); }
const busboy = Busboy({ headers: req.headers, limits: { files: 1, fileSize: Infinity } });
let responded = false;
const done = (code, body) => { if (!responded) { responded = true; res.status(code).json(body); } };
busboy.on('file', async (_field, fileStream, info) => {
const { filename, mimeType } = info;
const fileId = crypto.randomBytes(16).toString('hex');
const chunks = []; // { chunkIndex, messageId, size }
let chunkIndex = 0;
let totalSize = 0;
let chunkBufs = [];
let chunkBytes = 0;
let uploadError = null;
const flushChunk = async () => {
if (!chunkBufs.length) return;
const buf = Buffer.concat(chunkBufs);
chunkBufs = [];
chunkBytes = 0;
const idx = chunkIndex++;
const msgId = await uploadBuffer(client, buf, `${fileId}_${idx}`);
chunks.push({ chunkIndex: idx, messageId: msgId, size: buf.length });
};
// Backpressure-aware data handler
fileStream.on('data', async (data) => {
if (uploadError) return;
fileStream.pause();
totalSize += data.length;
chunkBufs.push(data);
chunkBytes += data.length;
try {
if (chunkBytes >= CHUNK_SIZE) await flushChunk();
} catch (e) {
uploadError = e;
fileStream.destroy();
}
fileStream.resume();
});
fileStream.on('end', async () => {
if (uploadError) return done(500, { error: uploadError.message });
try {
await flushChunk(); // flush remainder
await Files.insertOne({ fileId, filename, mimeType, totalSize, chunks, uploadedAt: new Date() });
done(200, {
ok: true, fileId, filename,
size: totalSize,
chunks: chunks.length,
downloadUrl: `${BASE_URL}/download/${fileId}`,
});
} catch (e) {
done(500, { error: e.message });
}
});
fileStream.on('error', (e) => done(500, { error: e.message }));
});
busboy.on('error', (e) => done(500, { error: e.message }));
req.pipe(busboy);
});
// ── GET /download/:fileId ────────────────────
app.get('/download/:fileId', async (req, res) => {
const doc = await Files.findOne({ fileId: req.params.fileId });
if (!doc) return res.status(404).json({ error: 'File not found.' });
let client;
try { client = getClient(); }
catch (e) { return res.status(503).json({ error: e.message }); }
res.setHeader('Content-Disposition', `attachment; filename="${encodeURIComponent(doc.filename)}"`);
res.setHeader('Content-Type', doc.mimeType || 'application/octet-stream');
if (doc.totalSize) res.setHeader('Content-Length', String(doc.totalSize));
const sorted = [...doc.chunks].sort((a, b) => a.chunkIndex - b.chunkIndex);
for (const chunk of sorted) {
try {
await streamChunk(client, chunk.messageId, res);
} catch (e) {
console.error('[Download] chunk error:', e.message);
if (!res.headersSent) res.status(500).json({ error: e.message });
else res.destroy();
return;
}
}
res.end();
});
// ─────────────────────────────────────────────
// TELEGRAM HELPERS
// ─────────────────────────────────────────────
// Upload a Buffer to the Telegram channel and return the message ID
async function uploadBuffer(client, buffer, caption) {
const file = await client.uploadFile({
file: new BufferFile(caption, buffer),
workers: DL_WORKERS,
});
const msg = await client.sendFile(CHANNEL_ID, {
file,
caption,
forceDocument: true,
workers: DL_WORKERS,
});
return msg.id;
}
// Stream a Telegram message's media directly to the HTTP response
async function streamChunk(client, messageId, res) {
const [msg] = await client.getMessages(CHANNEL_ID, { ids: [messageId] });
if (!msg?.media) throw new Error(`No media in message ${messageId}`);
const pass = new PassThrough();
pass.on('data', (chunk) => {
const ok = res.write(chunk);
if (!ok) {
pass.pause();
res.once('drain', () => pass.resume()); // handle backpressure
}
});
await new Promise((resolve, reject) => {
pass.on('end', resolve);
pass.on('error', reject);
client.downloadMedia(msg, { outputFile: pass, workers: DL_WORKERS })
.then(() => pass.end())
.catch(reject);
});
}
// GramJS-compatible file object that reads from a Buffer
class BufferFile {
constructor(name, buffer) {
this.name = name;
this.size = buffer.length;
this.path = name;
this._buf = buffer;
}
async *[Symbol.asyncIterator]() {
const PART = 512 * 1024;
for (let i = 0; i < this._buf.length; i += PART)
yield this._buf.slice(i, i + PART);
}
}
// ─────────────────────────────────────────────
// BOOT
// ─────────────────────────────────────────────
(async () => {
await connectMongo();
await loadSessions();
app.listen(PORT, '0.0.0.0', () => {
console.log(`\nπŸš€ Server ready at http://0.0.0.0:${PORT}`);
console.log(`πŸ”— Base URL : ${BASE_URL}`);
console.log(`πŸ“¦ Sessions : ${clientPool.length}`);
console.log(`πŸ—„οΈ MongoDB : ${MONGO_URI}\n`);
});
})();