@@ -161,39 +161,20 @@ function wireListeners(proxyRes, decompressor, state, onChunk, onFinalize) {
161161}
162162
163163/**
164- * Finalize token tracking for an HTTP response .
164+ * Extract usage and model from accumulated tracking state .
165165 *
166- * Parses accumulated SSE events or buffered JSON, normalizes usage,
167- * calls optional callbacks, updates metrics, and writes the log record.
168- * Accepts explicit state instead of relying on a closure, making it
169- * independently unit-testable.
166+ * Encapsulates the streaming-vs-non-streaming branching so each path can be
167+ * tested independently. Mutates `state` in place for streaming (flushes the
168+ * remaining partial line and updates `state.observedCacheReadTokens`).
170169 *
171170 * @param {object } state - Mutable tracking state from initHttpState
172- * @param {object } proxyRes - Upstream response (only statusCode is read)
173- * @param {object } opts - Original options passed to trackTokenUsage
171+ * @returns {{ usage: object|null, model: string|null } }
174172 */
175- function finalizeHttpTracking ( state , proxyRes , opts ) {
176- const { requestId, provider, path : reqPath , startTime, metrics : metricsRef , billingInfo, initiatorSent, requestModel, onUsage, onSpanEnd } = opts ;
177- const { streaming, compressed, contentEncoding } = state ;
178-
179- // Only process successful responses (2xx)
180- if ( proxyRes . statusCode < 200 || proxyRes . statusCode >= 300 ) {
181- logRequest ( 'debug' , 'token_track_skip_status' , {
182- request_id : requestId ,
183- provider,
184- status : proxyRes . statusCode ,
185- } ) ;
186- diag ( 'HTTP_TRACK_SKIP_STATUS' , { request_id : requestId , provider, status : proxyRes . statusCode } ) ;
187- if ( typeof onSpanEnd === 'function' ) onSpanEnd ( proxyRes . statusCode ) ;
188- return ;
189- }
190-
191- const duration = Date . now ( ) - startTime ;
173+ function extractUsageFromTrackedState ( state ) {
192174 let usage = null ;
193175 let model = null ;
194- let budgetResult ;
195176
196- if ( streaming ) {
177+ if ( state . streaming ) {
197178 // Process any remaining partial line
198179 if ( state . partialLine . trim ( ) ) {
199180 const dataLines = parseSseDataLines ( state . partialLine ) ;
@@ -227,6 +208,96 @@ function finalizeHttpTracking(state, proxyRes, opts) {
227208 }
228209 }
229210
211+ return { usage, model } ;
212+ }
213+
214+ /**
215+ * Build a token usage record and persist it.
216+ *
217+ * Bundles record assembly (`buildTokenUsageRecord`), budget-field merging
218+ * (`mergeBudgetFields`), billing/initiator decorators, `writeTokenUsage`,
219+ * and the `logRequest` summary — pure persistence/reporting with no quota
220+ * logic.
221+ *
222+ * @param {object } normalized - Normalized usage object
223+ * @param {object } params
224+ * @param {string } params.requestId
225+ * @param {string } params.provider
226+ * @param {string } params.model
227+ * @param {string } params.reqPath
228+ * @param {number } params.status
229+ * @param {boolean } params.streaming
230+ * @param {number } params.duration
231+ * @param {number } params.responseBytes
232+ * @param {object|null } params.billingInfo
233+ * @param {string|null } params.initiatorSent
234+ * @param {object|undefined } params.budgetResult
235+ */
236+ function buildAndWriteTokenRecord ( normalized , { requestId, provider, model, reqPath, status, streaming, duration, responseBytes, billingInfo, initiatorSent, budgetResult } ) {
237+ const record = buildTokenUsageRecord ( normalized , {
238+ requestId,
239+ provider,
240+ model,
241+ reqPath,
242+ status,
243+ streaming,
244+ duration,
245+ responseBytes,
246+ } ) ;
247+
248+ // Include billing/quota info when available (Copilot PRU tracking)
249+ if ( initiatorSent ) record . x_initiator = initiatorSent ;
250+ if ( billingInfo ) record . billing = billingInfo ;
251+
252+ // Include effective token and AI credit budget fields when computed
253+ mergeBudgetFields ( record , budgetResult ) ;
254+
255+ // Write to JSONL log file
256+ writeTokenUsage ( record ) ;
257+
258+ // Log summary to stdout
259+ logRequest ( 'info' , 'token_usage' , {
260+ request_id : requestId ,
261+ provider,
262+ model : model || 'unknown' ,
263+ input_tokens : normalized . input_tokens ,
264+ output_tokens : normalized . output_tokens ,
265+ cache_read_tokens : normalized . cache_read_tokens ,
266+ cache_write_tokens : normalized . cache_write_tokens ,
267+ streaming,
268+ } ) ;
269+ }
270+
271+ /**
272+ * Finalize token tracking for an HTTP response.
273+ *
274+ * Orchestrates usage extraction, normalization, quota callback, metrics
275+ * update, and record persistence. Accepts explicit state instead of relying
276+ * on a closure, making it independently unit-testable.
277+ *
278+ * @param {object } state - Mutable tracking state from initHttpState
279+ * @param {object } proxyRes - Upstream response (only statusCode is read)
280+ * @param {object } opts - Original options passed to trackTokenUsage
281+ */
282+ function finalizeHttpTracking ( state , proxyRes , opts ) {
283+ const { requestId, provider, path : reqPath , startTime, metrics : metricsRef , billingInfo, initiatorSent, requestModel, onUsage, onSpanEnd } = opts ;
284+ const { streaming, compressed, contentEncoding } = state ;
285+
286+ // Only process successful responses (2xx)
287+ if ( proxyRes . statusCode < 200 || proxyRes . statusCode >= 300 ) {
288+ logRequest ( 'debug' , 'token_track_skip_status' , {
289+ request_id : requestId ,
290+ provider,
291+ status : proxyRes . statusCode ,
292+ } ) ;
293+ diag ( 'HTTP_TRACK_SKIP_STATUS' , { request_id : requestId , provider, status : proxyRes . statusCode } ) ;
294+ if ( typeof onSpanEnd === 'function' ) onSpanEnd ( proxyRes . statusCode ) ;
295+ return ;
296+ }
297+
298+ const duration = Date . now ( ) - startTime ;
299+ const { usage, model } = extractUsageFromTrackedState ( state ) ;
300+
230301 logRequest ( 'debug' , 'token_track_end' , {
231302 request_id : requestId ,
232303 provider,
@@ -248,6 +319,8 @@ function finalizeHttpTracking(state, proxyRes, opts) {
248319 if ( state . observedCacheReadTokens > 0 && normalized . cache_read_tokens === 0 ) {
249320 warnCacheReadRollupMismatch ( { logRequest, diag, requestId, provider, model, observedCacheReadTokens : state . observedCacheReadTokens , normalizedCacheReadTokens : normalized . cache_read_tokens , streaming } ) ;
250321 }
322+
323+ let budgetResult ;
251324 if ( typeof onUsage === 'function' ) {
252325 try {
253326 budgetResult = onUsage ( normalized , model || requestModel || provider || 'unknown' ) ;
@@ -259,8 +332,8 @@ function finalizeHttpTracking(state, proxyRes, opts) {
259332 // Update metrics
260333 incrementTokenMetrics ( metricsRef , provider , normalized ) ;
261334
262- // Build log record
263- const record = buildTokenUsageRecord ( normalized , {
335+ // Build log record and persist
336+ buildAndWriteTokenRecord ( normalized , {
264337 requestId,
265338 provider,
266339 model : model || requestModel || provider ,
@@ -269,28 +342,9 @@ function finalizeHttpTracking(state, proxyRes, opts) {
269342 streaming,
270343 duration,
271344 responseBytes : state . totalBytes ,
272- } ) ;
273-
274- // Include billing/quota info when available (Copilot PRU tracking)
275- if ( initiatorSent ) record . x_initiator = initiatorSent ;
276- if ( billingInfo ) record . billing = billingInfo ;
277-
278- // Include effective token and AI credit budget fields when computed
279- mergeBudgetFields ( record , budgetResult ) ;
280-
281- // Write to JSONL log file
282- writeTokenUsage ( record ) ;
283-
284- // Log summary to stdout
285- logRequest ( 'info' , 'token_usage' , {
286- request_id : requestId ,
287- provider,
288- model : model || requestModel || provider || 'unknown' ,
289- input_tokens : normalized . input_tokens ,
290- output_tokens : normalized . output_tokens ,
291- cache_read_tokens : normalized . cache_read_tokens ,
292- cache_write_tokens : normalized . cache_write_tokens ,
293- streaming,
345+ billingInfo,
346+ initiatorSent,
347+ budgetResult,
294348 } ) ;
295349
296350 if ( typeof onSpanEnd === 'function' ) onSpanEnd ( proxyRes . statusCode ) ;
@@ -358,4 +412,4 @@ function trackTokenUsage(proxyRes, opts) {
358412 wireListeners ( proxyRes , decompressor , state , onChunk , onFinalize ) ;
359413}
360414
361- module . exports = { trackTokenUsage, createChunkHandler, finalizeHttpTracking } ;
415+ module . exports = { trackTokenUsage, createChunkHandler, finalizeHttpTracking, extractUsageFromTrackedState , buildAndWriteTokenRecord } ;
0 commit comments