Skip to content

Commit 0a335b3

Browse files
bwp91claude
andcommitted
fix: parse multiple HTTP messages per TCP chunk in monitor
- The monitor's `socket.on('data', ...)` handler called `parseMessage(data)` once per TCP chunk and processed only that one message. - TCP gives no message-boundary guarantee. A bridge with several event-emitting accessories (motion sensors, thermostats, occupancy) routinely fires HAP `EVENT/1.0` notifications close enough together that they coalesce into a single packet. - Every additional message in the chunk was silently dropped, including the most recent value for the affected characteristic. - The reverse case — a single message split across two packets — also failed: the first packet had no body and was discarded, the second had no headers and was discarded too. - Add a per-instance `recvBuffer` on `HapEvInstance`, accumulate incoming chunks into it, and pull complete messages off the front using a new `findMessageBoundary` helper that consults the `Content-Length` header. - Reset `recvBuffer` on every `connectInstance` so a refresh after `monitor-close` starts clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2f4e9f9 commit 0a335b3

4 files changed

Lines changed: 192 additions & 39 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ All notable changes to `@homebridge/hap-client` will be documented in this file.
2323
- fix: use byte length for Content-Length in evented HTTP client
2424
- fix: parse full body when Content-Length missing in httpParser
2525
- fix: surface refreshInstances browser.update errors
26+
- fix: parse multiple HTTP messages per TCP chunk in monitor
2627

2728
### Homebridge Dependencies
2829

src/interfaces.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export interface HapEvInstance {
1818
evCharacteristics?: { aid: number, iid: number, ev: boolean }[]
1919
socket?: Socket
2020
monitoring?: boolean
21+
recvBuffer?: string
2122
}
2223

2324
export interface HapAccessoriesRespType {

src/monitor.test.ts

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import type { ServiceType } from './interfaces.js'
22

3+
import { Buffer } from 'node:buffer'
34
import { EventEmitter } from 'node:events'
45

56
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
67

7-
import { createConnection } from './eventedHttpClient/index.js'
8-
import { HapMonitor } from './monitor.js'
8+
import realHttpParser from './eventedHttpClient/httpParser.js'
9+
import { createConnection, parseMessage } from './eventedHttpClient/index.js'
10+
import { findMessageBoundary, HapMonitor } from './monitor.js'
911

1012
// Mock the eventedHttpClient module
1113
vi.mock('./eventedHttpClient/index.js', () => {
@@ -165,3 +167,106 @@ describe('hapMonitor', () => {
165167
})
166168
})
167169
})
170+
171+
describe('findMessageBoundary', () => {
172+
function build(body: string, headers: Record<string, string> = {}): string {
173+
const headerLines = ['EVENT/1.0 200 OK', 'Content-Type: application/hap+json', `Content-Length: ${Buffer.byteLength(body, 'utf8')}`]
174+
for (const [k, v] of Object.entries(headers)) {
175+
headerLines.push(`${k}: ${v}`)
176+
}
177+
return `${headerLines.join('\r\n')}\r\n\r\n${body}`
178+
}
179+
180+
it('returns -1 for an empty buffer', () => {
181+
expect(findMessageBoundary('')).toBe(-1)
182+
})
183+
184+
it('returns -1 when headers are not yet terminated', () => {
185+
expect(findMessageBoundary('EVENT/1.0 200 OK\r\nContent-Length: 5')).toBe(-1)
186+
})
187+
188+
it('returns the message length when one complete message is buffered', () => {
189+
const msg = build('{"a":1}')
190+
expect(findMessageBoundary(msg)).toBe(msg.length)
191+
})
192+
193+
it('returns the FIRST message length when two complete messages are buffered', () => {
194+
const first = build('{"a":1}')
195+
const second = build('{"b":2}')
196+
const combined = first + second
197+
198+
const boundary = findMessageBoundary(combined)
199+
expect(boundary).toBe(first.length)
200+
// The remainder is the start of the second message.
201+
expect(combined.slice(boundary)).toBe(second)
202+
})
203+
204+
it('returns -1 when the body is shorter than Content-Length', () => {
205+
const fullBody = '{"characteristics":[{"aid":1,"iid":10,"value":true}]}'
206+
const headers = `EVENT/1.0 200 OK\r\nContent-Type: application/hap+json\r\nContent-Length: ${fullBody.length}\r\n\r\n`
207+
const truncated = headers + fullBody.slice(0, fullBody.length - 5)
208+
209+
expect(findMessageBoundary(truncated)).toBe(-1)
210+
})
211+
})
212+
213+
describe('hapMonitor data handling - multiple messages per chunk', () => {
214+
let monitor: HapMonitor
215+
const username = 'AA:BB:CC:DD:EE:FF'
216+
217+
beforeEach(() => {
218+
vi.clearAllMocks()
219+
// Use the real HTTP parser for this scenario so we can assert how the
220+
// monitor splits and dispatches multi-message TCP chunks.
221+
vi.mocked(parseMessage).mockImplementation((msg: any) => realHttpParser(msg))
222+
223+
const services = [buildService(username)]
224+
monitor = new HapMonitor(null, vi.fn(), '031-45-154', services)
225+
})
226+
227+
afterEach(() => {
228+
monitor.finish()
229+
vi.mocked(parseMessage).mockReset()
230+
vi.mocked(parseMessage).mockImplementation(() => ({ statusCode: 200, protocol: 'HTTP' } as any))
231+
})
232+
233+
function buildEvent(value: boolean): string {
234+
const body = JSON.stringify({ characteristics: [{ aid: 1, iid: 2, value }] })
235+
return [
236+
'EVENT/1.0 200 OK',
237+
'Content-Type: application/hap+json',
238+
`Content-Length: ${Buffer.byteLength(body, 'utf8')}`,
239+
'',
240+
body,
241+
].join('\r\n')
242+
}
243+
244+
it('should emit service-update for every message in a single TCP chunk', () => {
245+
const updates: any[][] = []
246+
monitor.on('service-update', services => updates.push(services))
247+
248+
const socket = (monitor as any).evInstances[0].socket
249+
const combined = buildEvent(true) + buildEvent(false) + buildEvent(true)
250+
251+
socket.emit('data', Buffer.from(combined, 'utf8'))
252+
253+
// Without the fix only the first message was parsed and emitted; the
254+
// second and third silently fell off the floor.
255+
expect(updates).toHaveLength(3)
256+
})
257+
258+
it('should buffer a message split across two data events and emit once when complete', () => {
259+
const updates: any[][] = []
260+
monitor.on('service-update', services => updates.push(services))
261+
262+
const socket = (monitor as any).evInstances[0].socket
263+
const full = buildEvent(true)
264+
const splitAt = Math.floor(full.length / 2)
265+
266+
socket.emit('data', Buffer.from(full.slice(0, splitAt), 'utf8'))
267+
expect(updates).toHaveLength(0) // not enough data yet
268+
269+
socket.emit('data', Buffer.from(full.slice(splitAt), 'utf8'))
270+
expect(updates).toHaveLength(1)
271+
})
272+
})

src/monitor.ts

Lines changed: 83 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,34 @@ import { EventEmitter } from 'node:events'
33
import { createConnection, parseMessage } from './eventedHttpClient/index.js'
44
import { HapEvInstance, ServiceType } from './interfaces.js'
55

6+
/**
7+
* Locate the end of the next complete HTTP/EVENT message in `buffer`.
8+
* Returns -1 when the buffer does not yet contain a full message.
9+
*/
10+
export function findMessageBoundary(buffer: string): number {
11+
// Locate the blank line separating headers from body.
12+
const sepMatch = buffer.match(/\r?\n\r?\n/)
13+
if (!sepMatch || sepMatch.index === undefined) {
14+
return -1
15+
}
16+
17+
const headers = buffer.slice(0, sepMatch.index)
18+
const bodyStart = sepMatch.index + sepMatch[0].length
19+
20+
const contentLengthMatch = headers.match(/Content-Length:\s*(\d+)/i)
21+
if (!contentLengthMatch) {
22+
// No Content-Length: we cannot tell where this message ends, so consume
23+
// the rest of the buffer as a single message.
24+
return buffer.length
25+
}
26+
27+
const messageEnd = bodyStart + Number(contentLengthMatch[1])
28+
if (buffer.length < messageEnd) {
29+
return -1
30+
}
31+
return messageEnd
32+
}
33+
634
/**
735
* HapMonitor - Creates a monitor to watch for changes in accessory characteristics. And generates 'service-update' events when they change.
836
*/
@@ -51,48 +79,24 @@ export class HapMonitor extends EventEmitter {
5179
this.debug(`[HapClient] [${instance.ipAddress}:${instance.port} (${instance.username})] Connecting`)
5280
instance.socket = createConnection(instance, this.pin, { characteristics: instance.evCharacteristics })
5381
instance.monitoring = true
82+
instance.recvBuffer = ''
5483

5584
this.debug(`[HapClient] [${instance.ipAddress}:${instance.port} (${instance.username})] Connected`)
5685

5786
instance.socket.on('data', (data) => {
58-
const message = parseMessage(data)
59-
60-
if (message.statusCode === 401) {
61-
this.debug(`[HapClient] [${instance.ipAddress}:${instance.port} (${instance.username})] `
62-
+ `${message.statusCode} ${message.statusMessage} - make sure Homebridge pin for this instance is set to ${this.pin}.`)
63-
}
64-
65-
if (message.protocol === 'EVENT') {
66-
try {
67-
const body = JSON.parse(message.body)
68-
if (body.characteristics && body.characteristics.length) {
69-
this.debug(`[HapClient] [${instance.ipAddress}:${instance.port} (${instance.username})] `
70-
+ `Got Event: ${JSON.stringify(body.characteristics)}`)
71-
72-
const response = body.characteristics.map((c) => {
73-
// find the matching service for each characteristic
74-
const services = this.services.filter(x => x.aid === c.aid && x.instance.username === instance.username)
75-
const service = services.find(x => x.serviceCharacteristics.find(y => y.iid === c.iid))
76-
77-
if (service) {
78-
// find the correct characteristic and update it
79-
const characteristic = service.serviceCharacteristics.find(x => x.iid === c.iid)
80-
if (characteristic) {
81-
characteristic.value = c.value
82-
service.values[characteristic.type] = c.value
83-
return service
84-
}
85-
}
86-
87-
return undefined
88-
})
89-
90-
// push update to listeners
91-
this.emit('service-update', response.filter(x => x))
92-
}
93-
} catch {
94-
// do nothing
87+
instance.recvBuffer = (instance.recvBuffer || '') + data.toString()
88+
89+
// Several HAP EVENT messages can arrive in a single TCP chunk on busy
90+
// bridges, and a single message can also be split across packets. Pull
91+
// complete messages off the front of the buffer one at a time.
92+
while (true) {
93+
const boundary = findMessageBoundary(instance.recvBuffer)
94+
if (boundary <= 0) {
95+
break
9596
}
97+
const messageStr = instance.recvBuffer.slice(0, boundary)
98+
instance.recvBuffer = instance.recvBuffer.slice(boundary)
99+
this.handleEventMessage(instance, messageStr)
96100
}
97101
})
98102
instance.socket.on('close', (hadError) => {
@@ -159,6 +163,48 @@ export class HapMonitor extends EventEmitter {
159163
return instance?.socket != null && instance?.socket !== undefined && !instance.socket.destroyed
160164
}
161165

166+
private handleEventMessage(instance: HapEvInstance, messageStr: string) {
167+
const message = parseMessage(messageStr)
168+
169+
if (message.statusCode === 401) {
170+
this.debug(`[HapClient] [${instance.ipAddress}:${instance.port} (${instance.username})] `
171+
+ `${message.statusCode} ${message.statusMessage} - make sure Homebridge pin for this instance is set to ${this.pin}.`)
172+
}
173+
174+
if (message.protocol === 'EVENT') {
175+
try {
176+
const body = JSON.parse(message.body)
177+
if (body.characteristics && body.characteristics.length) {
178+
this.debug(`[HapClient] [${instance.ipAddress}:${instance.port} (${instance.username})] `
179+
+ `Got Event: ${JSON.stringify(body.characteristics)}`)
180+
181+
const response = body.characteristics.map((c) => {
182+
// find the matching service for each characteristic
183+
const services = this.services.filter(x => x.aid === c.aid && x.instance.username === instance.username)
184+
const service = services.find(x => x.serviceCharacteristics.find(y => y.iid === c.iid))
185+
186+
if (service) {
187+
// find the correct characteristic and update it
188+
const characteristic = service.serviceCharacteristics.find(x => x.iid === c.iid)
189+
if (characteristic) {
190+
characteristic.value = c.value
191+
service.values[characteristic.type] = c.value
192+
return service
193+
}
194+
}
195+
196+
return undefined
197+
})
198+
199+
// push update to listeners
200+
this.emit('service-update', response.filter(x => x))
201+
}
202+
} catch {
203+
// do nothing
204+
}
205+
}
206+
}
207+
162208
parseServices() {
163209
// get a list of characteristics we can watch for each instance
164210
for (const service of this.services) {

0 commit comments

Comments
 (0)