-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtelegramSafe.js
More file actions
371 lines (335 loc) · 15.8 KB
/
telegramSafe.js
File metadata and controls
371 lines (335 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
'use strict';
// =============================================================================
// telegramSafe.js — Safe, rate-limited Telegram API wrappers
//
// Usage:
// const tg = require('./telegramSafe');
// tg.init(bot); // call once, immediately after new Telegraf(TOKEN)
//
// After init():
// - ALL Telegram API calls — including ctx.reply(), ctx.answerCbQuery(),
// ctx.editMessageText(), ctx.telegram.sendMessage(), bot.telegram.*() —
// are automatically rate-limited. No other changes needed.
// - The explicit exports (sendMessage, editMessageText, etc.) are provided
// for modules that do not have a Telegraf context (e.g. backend.js).
//
// Rate limits enforced (see rateLimiter.js):
// - ~28 calls/sec globally
// - 1 message/sec per chat
// - 429 responses trigger exponential-backoff retry (up to 3 retries)
// =============================================================================
const { enqueue, globalThrottle } = require('./rateLimiter');
// ─── callApi bypass lists ─────────────────────────────────────────────────────
//
// The callApi patch routes unknown methods through globalThrottle (which has a
// hard 15-second timeout). Methods listed here BYPASS that wrapper and call
// the original callApi directly.
//
// Two categories of bypass:
//
// LONG_POLL_METHODS — Telegram long-polling calls.
// getUpdates passes timeout=50 so Telegram holds the connection for up to
// 50 seconds. Applying a 15-second race on top kills every poll cycle.
//
// LIFECYCLE_METHODS — Telegraf startup / shutdown one-shots.
// Fast calls (<1 s), but must never be queued behind a getUpdates slot in
// the global rate-limit chain or they will be delayed by up to 50 seconds.
//
// Individually-patched methods (sendMessage, sendPhoto, etc.) are also in this
// set to prevent double-wrapping when Telegraf routes them through callApi.
//
// To add a future Telegraf lifecycle call: append to LIFECYCLE_METHODS below.
const LONG_POLL_METHODS = new Set([
'getUpdates',
]);
const LIFECYCLE_METHODS = new Set([
'getMe',
'deleteWebhook',
'setWebhook',
'getWebhookInfo',
'close',
'logOut',
]);
// Methods individually patched in init() — already wrapped; skip callApi re-wrap.
const INDIVIDUALLY_PATCHED_METHODS = new Set([
'sendMessage',
'editMessageText',
'answerCbQuery',
'answerCallbackQuery',
'sendPhoto',
'sendDocument',
'sendAnimation',
'sendSticker',
'forwardMessage',
]);
// Combined bypass set used in the callApi patch.
const CALLAPI_BYPASS_METHODS = new Set([
...LONG_POLL_METHODS,
...LIFECYCLE_METHODS,
...INDIVIDUALLY_PATCHED_METHODS,
]);
// ─── State ────────────────────────────────────────────────────────────────────
let _bot = null;
let _inited = false;
function _sanitizePayload(text, extra = {}) {
const sanitizer = global.__RUNEWAGER_TELEGRAM_SANITIZE__;
if (typeof sanitizer === 'function') {
return sanitizer(text, extra);
}
return { text, extra: { ...(extra || {}) } };
}
function _applyNotificationPolicy(chatId, extra = {}) {
const nextExtra = { ...(extra || {}) };
const policy = global.__RUNEWAGER_TELEGRAM_NOTIFICATION_POLICY__;
if (typeof policy === 'function') {
try {
const result = policy(chatId, nextExtra);
if (result && typeof result === 'object') return { ...nextExtra, ...result };
} catch (_) {
return nextExtra;
}
}
return nextExtra;
}
function _sanitizeMediaExtra(extra = {}) {
const nextExtra = { ...(extra || {}) };
if (typeof nextExtra.caption === 'string' && nextExtra.caption.length > 0) {
const sanitized = _sanitizePayload(nextExtra.caption, nextExtra);
nextExtra.caption = sanitized.text;
Object.assign(nextExtra, sanitized.extra);
}
return nextExtra;
}
// ─── 429 / transient-error retry ─────────────────────────────────────────────
/**
* Wrap an API call with retry logic for 429 Too Many Requests and transient
* network errors. Respects Telegram's `retry_after` field when present.
*
* @param {() => Promise<any>} fn
* @param {number} [maxRetries=3]
* @returns {Promise<any>}
*/
async function _withRetry(fn, maxRetries = 3) {
let lastErr;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
lastErr = err;
const code = err?.response?.error_code ?? err?.code;
if (code === 429) {
const retryAfterMs = ((err?.response?.parameters?.retry_after) ?? 5) * 1000;
if (attempt < maxRetries) {
const wait = retryAfterMs + 250; // small extra margin
console.warn(
`[telegramSafe] 429 rate-limited — waiting ${wait}ms before retry`
+ ` (attempt ${attempt + 1}/${maxRetries})`
);
await new Promise((r) => setTimeout(r, wait));
continue;
}
}
// Transient network errors (ETIMEDOUT, ECONNRESET, ECONNREFUSED)
const isTransient = (
err?.code === 'ETIMEDOUT'
|| err?.code === 'ECONNRESET'
|| err?.code === 'ECONNREFUSED'
|| err?.response?.error_code === 502
);
if (isTransient && attempt < maxRetries) {
const wait = (2 ** attempt) * 500; // 500ms, 1000ms, 2000ms
console.warn(`[telegramSafe] Transient error (${err.code ?? err.response?.error_code}) — retry in ${wait}ms`);
await new Promise((r) => setTimeout(r, wait));
continue;
}
// Non-retryable error — log and re-throw
console.error(
`[telegramSafe] API error (code=${code}, msg=${err?.response?.description ?? err?.message})`
);
throw err;
}
}
throw lastErr;
}
// ─── init() — patches bot.telegram in place ───────────────────────────────────
/**
* Patch bot.telegram methods to route through the rate limiter.
*
* Must be called ONCE, immediately after:
* const bot = new Telegraf(BOT_TOKEN);
*
* Because ctx.telegram IS bot.telegram (same object reference in Telegraf v4),
* this single patch covers ALL call paths:
* ctx.reply() → bot.telegram.sendMessage ✓
* ctx.replyWithPhoto() → bot.telegram.sendPhoto ✓
* ctx.editMessageText() → bot.telegram.editMessageText ✓
* ctx.answerCbQuery() → bot.telegram.answerCbQuery ✓
* bot.telegram.sendMessage → directly patched ✓
* ctx.telegram.sendMessage → same object, directly patched ✓
*
* @param {import('telegraf').Telegraf} bot
*/
function init(bot) {
if (_inited) return;
if (!bot || !bot.telegram) {
throw new Error('[telegramSafe] init(bot) requires a valid Telegraf instance');
}
const tg = bot.telegram;
// ── sendMessage ────────────────────────────────────────────────────────────
const _sendMessage = tg.sendMessage.bind(tg);
tg.sendMessage = (chatId, text, extra) => {
const sanitized = _sanitizePayload(text, _applyNotificationPolicy(chatId, extra));
return _withRetry(() => enqueue(chatId, () => _sendMessage(chatId, sanitized.text, sanitized.extra)));
};
// ── editMessageText ────────────────────────────────────────────────────────
// Telegraf Telegram#editMessageText(chatId, messageId, inlineMessageId, text, extra)
const _editMessageText = tg.editMessageText.bind(tg);
tg.editMessageText = (chatId, messageId, inlineMessageId, text, extra) => {
const sanitized = _sanitizePayload(text, extra);
return _withRetry(() => enqueue(chatId, () =>
_editMessageText(chatId, messageId, inlineMessageId, sanitized.text, sanitized.extra)
));
};
// ── answerCbQuery (Telegraf v4 method name on Telegram class) ─────────────
// ctx.answerCbQuery() → tg.answerCbQuery(callbackQueryId, text, showAlert, extra)
// Must NOT go through per-chat queue — has a hard 10-second deadline.
if (typeof tg.answerCbQuery === 'function') {
const _answerCbQuery = tg.answerCbQuery.bind(tg);
tg.answerCbQuery = (callbackQueryId, ...args) =>
_withRetry(() => globalThrottle(() => _answerCbQuery(callbackQueryId, ...args)));
}
// Also patch answerCallbackQuery in case it exists as an alias
if (typeof tg.answerCallbackQuery === 'function' && tg.answerCallbackQuery !== tg.answerCbQuery) {
const _answerCallbackQuery = tg.answerCallbackQuery.bind(tg);
tg.answerCallbackQuery = (callbackQueryId, ...args) =>
_withRetry(() => globalThrottle(() => _answerCallbackQuery(callbackQueryId, ...args)));
}
// ── sendPhoto ──────────────────────────────────────────────────────────────
if (typeof tg.sendPhoto === 'function') {
const _sendPhoto = tg.sendPhoto.bind(tg);
tg.sendPhoto = (chatId, photo, extra) =>
_withRetry(() => enqueue(chatId, () => _sendPhoto(chatId, photo, _sanitizeMediaExtra(extra))));
}
// ── sendDocument ───────────────────────────────────────────────────────────
if (typeof tg.sendDocument === 'function') {
const _sendDocument = tg.sendDocument.bind(tg);
tg.sendDocument = (chatId, document, extra) =>
_withRetry(() => enqueue(chatId, () => _sendDocument(chatId, document, _sanitizeMediaExtra(extra))));
}
// ── sendAnimation (used by ctx.replyWithAnimation) ─────────────────────────
if (typeof tg.sendAnimation === 'function') {
const _sendAnimation = tg.sendAnimation.bind(tg);
tg.sendAnimation = (chatId, animation, extra) =>
_withRetry(() => enqueue(chatId, () => _sendAnimation(chatId, animation, _sanitizeMediaExtra(extra))));
}
// ── sendSticker ────────────────────────────────────────────────────────────
if (typeof tg.sendSticker === 'function') {
const _sendSticker = tg.sendSticker.bind(tg);
tg.sendSticker = (chatId, sticker, extra) =>
_withRetry(() => enqueue(chatId, () => _sendSticker(chatId, sticker, extra)));
}
// ── forwardMessage ─────────────────────────────────────────────────────────
if (typeof tg.forwardMessage === 'function') {
const _forwardMessage = tg.forwardMessage.bind(tg);
tg.forwardMessage = (chatId, fromChatId, messageId, extra) =>
_withRetry(() => enqueue(chatId, () => _forwardMessage(chatId, fromChatId, messageId, extra)));
}
// ── callApi (safety net for all methods not individually patched above) ────
// Telegraf routes every API method through callApi internally. By patching it
// here we cover methods like deleteMessage, getChatMember, setMyCommands, etc.
// The managed-methods set prevents double-wrapping: when an individually-patched
// method (sendMessage, etc.) calls this.callApi internally, we skip the extra
// layer and pass through to the original.
if (typeof tg.callApi === 'function') {
const _callApi = tg.callApi.bind(tg);
tg.callApi = (method, data, signal) => {
// Bypass rate-limiter for long-poll, lifecycle, and individually-patched
// methods — see CALLAPI_BYPASS_METHODS definition at top of file.
if (CALLAPI_BYPASS_METHODS.has(method)) return _callApi(method, data, signal);
return _withRetry(() => globalThrottle(() => _callApi(method, data, signal)));
};
}
_bot = bot;
_inited = true;
console.log('[telegramSafe] bot.telegram methods patched — all API calls are now rate-limited');
}
// ─── Explicit exports (for use without a ctx) ─────────────────────────────────
/**
* Rate-limited sendMessage.
* @param {string|number} chatId
* @param {string} text
* @param {object} [options]
* @returns {Promise<import('telegraf/typings/core/types/typegram').Message.TextMessage>}
*/
function sendMessage(chatId, text, options = {}) {
_assertInited();
return _bot.telegram.sendMessage(chatId, text, options);
}
/**
* Rate-limited editMessageText.
* @param {string|number} chatId
* @param {number} messageId
* @param {string} text
* @param {object} [options]
* @returns {Promise<import('telegraf/typings/core/types/typegram').Message.TextMessage | boolean>}
*/
function editMessageText(chatId, messageId, text, options = {}) {
_assertInited();
return _bot.telegram.editMessageText(chatId, messageId, undefined, text, options);
}
/**
* Rate-limited answerCallbackQuery.
* Uses globalThrottle (not per-chat queue) to respect the 10-second deadline.
* @param {string} callbackId
* @param {object} [options]
* @returns {Promise<boolean>}
*/
function answerCallbackQuery(callbackId, options = {}) {
_assertInited();
const { text, showAlert, extra } = options || {};
// Prefer answerCbQuery (Telegraf v4 name); fall back to answerCallbackQuery alias.
// Both methods are already patched in init() — call directly, no extra wrapping.
const method = typeof _bot.telegram.answerCbQuery === 'function'
? _bot.telegram.answerCbQuery.bind(_bot.telegram)
: _bot.telegram.answerCallbackQuery.bind(_bot.telegram);
return method(callbackId, text, showAlert, extra);
}
/**
* Rate-limited sendPhoto.
* @param {string|number} chatId
* @param {string|object} photo
* @param {object} [options]
*/
function sendPhoto(chatId, photo, options = {}) {
_assertInited();
return _bot.telegram.sendPhoto(chatId, photo, options);
}
/**
* Rate-limited sendDocument.
* @param {string|number} chatId
* @param {string|object} document
* @param {object} [options]
*/
function sendDocument(chatId, document, options = {}) {
_assertInited();
return _bot.telegram.sendDocument(chatId, document, options);
}
function _assertInited() {
if (!_inited || !_bot) {
throw new Error('[telegramSafe] init(bot) must be called before using wrapper functions');
}
}
module.exports = {
init,
sendMessage,
editMessageText,
answerCallbackQuery,
sendPhoto,
sendDocument,
// Exported for testing — frozen arrays so callers can inspect which methods
// bypass rate-limiting without being able to mutate the live runtime Sets.
LONG_POLL_METHODS: Object.freeze([...LONG_POLL_METHODS]),
LIFECYCLE_METHODS: Object.freeze([...LIFECYCLE_METHODS]),
INDIVIDUALLY_PATCHED_METHODS: Object.freeze([...INDIVIDUALLY_PATCHED_METHODS]),
CALLAPI_BYPASS_METHODS: Object.freeze([...CALLAPI_BYPASS_METHODS]),
};