Skip to content

Commit c4b2d71

Browse files
authored
refactor: split proxy-request.js into http-client.js and body-handler.js
- Extract HTTPS proxy agent into containers/api-proxy/http-client.js so model-discovery.js no longer cross-imports from the proxy core. - Extract collectRequestBody, transformRequestBody, and the _sleep indirection into containers/api-proxy/body-handler.js via a createBodyHandler(deps) factory, enabling isolated unit testing. - proxy-request.js imports from both new modules and re-exports all previous symbols for full backwards compatibility. - model-discovery.js now imports proxyAgent from http-client.js. Closes #4932
1 parent f92806e commit c4b2d71

4 files changed

Lines changed: 235 additions & 164 deletions

File tree

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
'use strict';
2+
3+
/**
4+
* AWF API Proxy — Request body I/O and transformation.
5+
*
6+
* Responsibilities:
7+
* 1. collectRequestBody — stream the inbound HTTP body, enforce the 10 MB
8+
* size limit, and surface 400/413 errors inline.
9+
* 2. transformRequestBody — apply the sequential body-transform pipeline
10+
* (caller transform → null-tool-call sanitisation →
11+
* steering injection → stream_options injection).
12+
*
13+
* Both functions are returned by `createBodyHandler(deps)` so that callers
14+
* (proxy-request.js) can inject `handleRequestError` and `otel` without
15+
* creating a circular-module dependency.
16+
*
17+
* The `_sleep` indirection (and its test setters) lives here because it is
18+
* the module most tightly coupled to async timing in the request pipeline.
19+
* proxy-request.js imports `sleep` from this module and uses it for the
20+
* model-not-supported retry backoff.
21+
*/
22+
23+
const { sanitizeNullToolCallTypes, injectSteeringMessage, injectStreamOptions } = require('./body-transform');
24+
const { sanitizeForLog, logRequest } = require('./logging');
25+
const metrics = require('./metrics');
26+
const { getAndClearPendingSteeringMessage } = require('./guards/effective-token-guard');
27+
const { getAndClearPendingTimeoutSteeringMessage } = require('./guards/timeout-steering');
28+
29+
/** Maximum request body size: 10 MB to prevent DoS via large payloads. */
30+
const MAX_BODY_SIZE = 10 * 1024 * 1024;
31+
32+
/** When false, token-budget warnings are never injected into request bodies. */
33+
const isSteeringEnabled = () => process.env.AWF_ENABLE_TOKEN_STEERING === 'true';
34+
35+
// ── Sleep abstraction (overridable in tests to avoid real setTimeout delays) ──
36+
37+
/** Resolves after `ms` milliseconds (overridable in tests via module-level setter). */
38+
let _sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
39+
40+
/**
41+
* Thin indirection used by proxy-request.js so that replacing `_sleep` via
42+
* `_setSleepForTests` is reflected in all callers without them needing to
43+
* re-import the variable.
44+
*
45+
* @param {number} ms
46+
* @returns {Promise<void>}
47+
*/
48+
const sleep = (ms) => _sleep(ms);
49+
50+
/** @internal Test-only: replace the sleep implementation so retries are instant. */
51+
function _setSleepForTests(fn) { _sleep = fn; }
52+
/** @internal Test-only: restore the real sleep implementation. */
53+
function _resetSleepForTests() { _sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms)); }
54+
55+
// ── Factory ───────────────────────────────────────────────────────────────────
56+
57+
/**
58+
* Create the body-handler functions with injected dependencies.
59+
*
60+
* @param {{ handleRequestError: Function, otel: object }} deps
61+
* @returns {{ collectRequestBody: Function, transformRequestBody: Function }}
62+
*/
63+
function createBodyHandler({ handleRequestError, otel }) {
64+
/**
65+
* Collect the full request body from the inbound stream, enforcing the 10 MB
66+
* size limit. Sends a 413 response inline when the limit is exceeded, and
67+
* handles client stream errors with a 400 response.
68+
*
69+
* @param {import('http').IncomingMessage} req
70+
* @param {string} provider
71+
* @param {string} requestId
72+
* @param {import('http').ServerResponse} res
73+
* @param {object} span - OTEL span (or no-op shim)
74+
* @param {number} startTime - Request start timestamp (ms)
75+
* @param {string} targetHost - Upstream hostname (used in log fields)
76+
* @returns {Promise<Buffer|null>} Collected body, or null if the request was
77+
* already rejected (413) or errored before the body was fully received.
78+
*/
79+
function collectRequestBody(req, provider, requestId, res, span, startTime, targetHost) {
80+
return new Promise((resolve) => {
81+
const chunks = [];
82+
let totalBytes = 0;
83+
let settled = false;
84+
85+
function settle(value) {
86+
if (settled) return;
87+
settled = true;
88+
resolve(value);
89+
}
90+
91+
req.on('close', () => {
92+
if (settled || req.complete) return;
93+
const duration = Date.now() - startTime;
94+
metrics.gaugeDec('active_requests', { provider });
95+
logRequest('warn', 'request_aborted', {
96+
request_id: requestId, provider, method: req.method,
97+
path: sanitizeForLog(req.url), duration_ms: duration,
98+
upstream_host: targetHost,
99+
});
100+
otel.endSpan(span, 0);
101+
settle(null);
102+
});
103+
104+
req.on('error', (err) => {
105+
if (settled) return;
106+
otel.endSpanError(span, err, 400);
107+
handleRequestError(err, {
108+
res, requestId, provider, req, targetHost,
109+
startTime, statusCode: 400, clientMessage: 'Client error',
110+
});
111+
settle(null);
112+
});
113+
114+
req.on('data', (chunk) => {
115+
if (settled) return;
116+
totalBytes += chunk.length;
117+
if (totalBytes > MAX_BODY_SIZE) {
118+
const duration = Date.now() - startTime;
119+
metrics.gaugeDec('active_requests', { provider });
120+
metrics.increment('requests_total', { provider, method: req.method, status_class: '4xx' });
121+
logRequest('warn', 'request_complete', {
122+
request_id: requestId, provider, method: req.method,
123+
path: sanitizeForLog(req.url), status: 413, duration_ms: duration,
124+
request_bytes: totalBytes, upstream_host: targetHost,
125+
});
126+
otel.endSpan(span, 413);
127+
if (!res.headersSent) res.writeHead(413, { 'Content-Type': 'application/json' });
128+
res.end(JSON.stringify({ error: 'Payload Too Large', message: 'Request body exceeds 10 MB limit' }));
129+
settle(null);
130+
return;
131+
}
132+
chunks.push(chunk);
133+
});
134+
135+
req.on('end', () => {
136+
settle(Buffer.concat(chunks));
137+
});
138+
});
139+
}
140+
141+
/**
142+
* Apply the sequential body-transform pipeline to the raw inbound body.
143+
*
144+
* Transforms applied in order:
145+
* 1. `bodyTransform` — optional caller-supplied transform
146+
* 2. `sanitizeNullToolCallTypes` — strips/normalizes null tool-call types
147+
* 3. `injectSteeringMessage` — timeout + token-budget steering (when enabled)
148+
* 4. `injectStreamOptions` — adds `stream_options.include_usage`
149+
*
150+
* @param {Buffer} body
151+
* @param {string} provider
152+
* @param {import('http').IncomingMessage} req
153+
* @param {string} requestId
154+
* @param {((body: Buffer) => (Buffer | null | Promise<Buffer | null>)) | null} bodyTransform
155+
* @returns {Promise<Buffer>}
156+
*/
157+
async function transformRequestBody(body, provider, req, requestId, bodyTransform) {
158+
if (bodyTransform && (req.method === 'POST' || req.method === 'PUT' || req.method === 'PATCH')) {
159+
const transformed = await bodyTransform(body);
160+
if (transformed) body = transformed;
161+
}
162+
163+
if (req.method === 'POST' || req.method === 'PUT' || req.method === 'PATCH') {
164+
const sanitized = sanitizeNullToolCallTypes(body);
165+
if (sanitized) {
166+
body = sanitized.body;
167+
logRequest('info', 'request_sanitized', {
168+
request_id: requestId,
169+
provider,
170+
normalized_tool_calls: sanitized.normalizedCount,
171+
dropped_tool_calls: sanitized.droppedCount,
172+
});
173+
}
174+
}
175+
176+
if (isSteeringEnabled() && (req.method === 'POST' || req.method === 'PUT')) {
177+
const steeringMessages = [
178+
{ type: 'timeout', message: getAndClearPendingTimeoutSteeringMessage() },
179+
{ type: 'token', message: getAndClearPendingSteeringMessage() },
180+
];
181+
for (const { type, message } of steeringMessages) {
182+
if (!message) continue;
183+
const steered = injectSteeringMessage(body, provider, message);
184+
if (steered) {
185+
body = steered;
186+
logRequest('info', `${type}_steering`, {
187+
request_id: requestId,
188+
provider,
189+
message,
190+
});
191+
}
192+
}
193+
}
194+
195+
// Inject stream_options.include_usage so streaming responses include token data
196+
if (req.method === 'POST') {
197+
const streamOpts = injectStreamOptions(body, provider, req.url);
198+
if (streamOpts) {
199+
body = streamOpts.body;
200+
}
201+
}
202+
203+
return body;
204+
}
205+
206+
return { collectRequestBody, transformRequestBody };
207+
}
208+
209+
module.exports = { createBodyHandler, sleep, _setSleepForTests, _resetSleepForTests };
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
'use strict';
2+
3+
/**
4+
* AWF API Proxy — Shared HTTP client infrastructure.
5+
*
6+
* Centralises the HTTPS proxy agent so that every module that makes outbound
7+
* HTTPS requests (proxy-request.js, model-discovery.js, …) reads from the
8+
* same singleton rather than each constructing its own agent.
9+
*/
10+
11+
const { HttpsProxyAgent } = require('https-proxy-agent');
12+
13+
// ── Module-level constants (read from env at load time) ───────────────────────
14+
const HTTPS_PROXY = process.env.HTTPS_PROXY || process.env.HTTP_PROXY;
15+
const proxyAgent = HTTPS_PROXY ? new HttpsProxyAgent(HTTPS_PROXY) : undefined;
16+
17+
module.exports = { HTTPS_PROXY, proxyAgent };

containers/api-proxy/model-discovery.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ const https = require('https');
2121
const { URL } = require('url');
2222
const { sanitizeForLog, logRequest } = require('./logging');
2323

24-
// ── Shared proxy agent (from proxy-request to avoid bootstrap duplication) ────
25-
const { proxyAgent } = require('./proxy-request');
24+
// ── Shared proxy agent ────────────────────────────────────────────────────────
25+
const { proxyAgent } = require('./http-client');
2626

2727
const MODELS_LOG_DIR = process.env.AWF_API_PROXY_LOG_DIR || '/var/log/api-proxy';
2828

0 commit comments

Comments
 (0)