Skip to content

Commit fa20519

Browse files
committed
fix: don't wait indefinitely to receive data
1 parent 1838578 commit fa20519

3 files changed

Lines changed: 46 additions & 26 deletions

File tree

pyomnilogic_local/api/api.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,7 @@
88
from pyomnilogic_local.models.filter_diagnostics import FilterDiagnostics
99
from pyomnilogic_local.models.mspconfig import MSPConfig
1010
from pyomnilogic_local.models.telemetry import Telemetry
11-
from pyomnilogic_local.omnitypes import (
12-
ColorLogicBrightness,
13-
ColorLogicSpeed,
14-
MessageType,
15-
)
11+
from pyomnilogic_local.omnitypes import ColorLogicBrightness, ColorLogicSpeed, MessageType
1612

1713
from .constants import (
1814
DEFAULT_CONTROLLER_PORT,
@@ -117,10 +113,8 @@ def __init__(
117113

118114
@overload
119115
async def async_send_message(self, message_type: MessageType, message: str | None, need_response: Literal[True]) -> str: ...
120-
121116
@overload
122117
async def async_send_message(self, message_type: MessageType, message: str | None, need_response: Literal[False]) -> None: ...
123-
124118
async def async_send_message(self, message_type: MessageType, message: str | None, need_response: bool = False) -> str | None:
125119
"""Send a message via the Hayward Omni UDP protocol along with properly handling timeouts and responses.
126120
@@ -138,7 +132,7 @@ async def async_send_message(self, message_type: MessageType, message: str | Non
138132
resp: str | None = None
139133
try:
140134
if need_response:
141-
resp = await protocol.send_and_receive(message_type, message)
135+
resp = await protocol.send_and_receive(message_type, message, response_timeout=self.response_timeout)
142136
else:
143137
await protocol.send_message(message_type, message)
144138
finally:

pyomnilogic_local/api/constants.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
# Timing Constants (in seconds)
1414
OMNI_RETRANSMIT_TIME = 2.1 # Time Omni waits before retransmitting a packet
1515
OMNI_RETRANSMIT_COUNT = 5 # Number of retransmit attempts (6 total including initial)
16-
ACK_WAIT_TIMEOUT = 0.5 # Timeout waiting for ACK response
17-
DEFAULT_RESPONSE_TIMEOUT = 5.0 # Default timeout for receiving responses
16+
ACK_WAIT_TIMEOUT = 1 # Timeout waiting for ACK response, 0.5 showed to be just a tad too short in some cases.
17+
DEFAULT_RESPONSE_TIMEOUT = OMNI_RETRANSMIT_TIME * OMNI_RETRANSMIT_COUNT # Default timeout for receiving responses
1818

1919
# Network Constants
2020
DEFAULT_CONTROLLER_PORT = 10444 # Default UDP port for OmniLogic communication

pyomnilogic_local/api/protocol.py

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from .constants import (
1616
ACK_WAIT_TIMEOUT,
1717
BLOCK_MESSAGE_HEADER_OFFSET,
18+
DEFAULT_RESPONSE_TIMEOUT,
1819
MAX_FRAGMENT_WAIT_TIME,
1920
MAX_QUEUE_SIZE,
2021
OMNI_RETRANSMIT_COUNT,
@@ -25,11 +26,7 @@
2526
XML_ENCODING,
2627
XML_NAMESPACE,
2728
)
28-
from .exceptions import (
29-
OmniFragmentationError,
30-
OmniMessageFormatError,
31-
OmniTimeoutError,
32-
)
29+
from .exceptions import OmniFragmentationError, OmniMessageFormatError, OmniTimeoutError
3330

3431
_LOGGER = logging.getLogger(__name__)
3532

@@ -225,21 +222,34 @@ async def _wait_for_ack(self, ack_id: int) -> None:
225222
Exception: If a protocol error occurs.
226223
"""
227224
# Wait for either an ACK message or an error
228-
while True:
225+
# Race condition: datagram_received() calls put_nowait() synchronously, so data_task may
226+
# already be done when wait_for fires its timeout CancelledError. In that case we catch
227+
# the cancellation, skip re-looping, and fall through to check the result below. If the
228+
# result is our ACK we return normally, suppressing the CancelledError so wait_for treats
229+
# the call as successful. If it isn't, we re-raise after the loop.
230+
cancelled: asyncio.CancelledError | None = None
231+
retry = True
232+
while retry:
229233
# Wait for either a message or an error
230234
data_task = asyncio.create_task(self.data_queue.get())
231235
error_task = asyncio.create_task(self.error_queue.get())
232-
done, pending = await asyncio.wait([data_task, error_task], return_when=asyncio.FIRST_COMPLETED)
236+
try:
237+
done, pending = await asyncio.wait([data_task, error_task], return_when=asyncio.FIRST_COMPLETED)
238+
except asyncio.CancelledError as exc:
239+
retry = False
240+
cancelled = exc
241+
done = {t for t in (data_task, error_task) if t.done()}
242+
pending = {t for t in (data_task, error_task) if not t.done()}
233243

234244
# Cancel any pending tasks to avoid "Task was destroyed but it is pending" warnings
235245
for task in pending:
236246
task.cancel()
237247

238248
if error_task in done:
239-
exc = error_task.result()
240-
if isinstance(exc, Exception):
241-
raise exc
242-
_LOGGER.error("Unknown error occurred during communication with OmniLogic: %s", exc)
249+
err = error_task.result()
250+
if isinstance(err, Exception):
251+
raise err
252+
_LOGGER.error("Unknown error occurred during communication with OmniLogic: %s", err)
243253
if data_task in done:
244254
message = data_task.result()
245255
if message.id == ack_id:
@@ -251,6 +261,9 @@ async def _wait_for_ack(self, ack_id: int) -> None:
251261
await self.data_queue.put(message)
252262
return
253263

264+
if cancelled is not None:
265+
raise cancelled
266+
254267
async def _ensure_sent(
255268
self,
256269
message: OmniLogicMessage,
@@ -299,19 +312,21 @@ async def send_and_receive(
299312
msg_type: MessageType,
300313
payload: str | None,
301314
msg_id: int | None = None,
315+
response_timeout: float = DEFAULT_RESPONSE_TIMEOUT,
302316
) -> str:
303317
"""Send a message and wait for a response, returning the response payload as a string.
304318
305319
Args:
306320
msg_type: Type of message to send.
307321
payload: Optional payload string.
308322
msg_id: Optional message ID.
323+
response_timeout: Timeout in seconds to wait for the response.
309324
310325
Returns:
311326
Response payload as a string.
312327
"""
313328
await self.send_message(msg_type, payload, msg_id)
314-
return await self._receive_file()
329+
return await self._receive_file(response_timeout=response_timeout)
315330

316331
# Send a message that you do NOT need a response to
317332
async def send_message(
@@ -346,11 +361,14 @@ async def _send_ack(self, msg_id: int) -> None:
346361
req_body = ET.tostring(body_element, xml_declaration=True, encoding=XML_ENCODING)
347362
await self.send_message(MessageType.XML_ACK, req_body, msg_id)
348363

349-
async def _receive_file(self) -> str:
364+
async def _receive_file(self, response_timeout: float = DEFAULT_RESPONSE_TIMEOUT) -> str:
350365
"""Wait for and reassemble a full response from the controller.
351366
352367
Handles single and multi-block (LeadMessage/BlockMessage) responses.
353368
369+
Args:
370+
response_timeout: Timeout in seconds to wait for the initial response.
371+
354372
Returns:
355373
Response payload as a string.
356374
@@ -359,13 +377,21 @@ async def _receive_file(self) -> str:
359377
OmniFragmentationException: If fragment reassembly fails.
360378
"""
361379
# wait for the initial packet.
362-
message = await self.data_queue.get()
380+
try:
381+
message = await asyncio.wait_for(self.data_queue.get(), response_timeout)
382+
except TimeoutError as exc:
383+
msg = f"Timeout waiting for response from controller: {exc}"
384+
raise OmniTimeoutError(msg) from exc
363385

364386
# If messages have to be re-transmitted, we can sometimes receive multiple ACKs. The first one would be handled by
365387
# self._ensure_sent, but if any subsequent ACKs are sent to us, we need to dump them and wait for a "real" message.
366388
while message.type in [MessageType.ACK, MessageType.XML_ACK]:
367389
_LOGGER.debug("Skipping duplicate ACK message")
368-
message = await self.data_queue.get()
390+
try:
391+
message = await asyncio.wait_for(self.data_queue.get(), response_timeout)
392+
except TimeoutError as exc:
393+
msg = f"Timeout waiting for response from controller: {exc}"
394+
raise OmniTimeoutError(msg) from exc
369395

370396
await self._send_ack(message.id)
371397

@@ -403,7 +429,7 @@ async def _receive_file(self) -> str:
403429

404430
# We need to wait long enough for the Omni to get through all of it's retries before we bail out.
405431
try:
406-
resp = await asyncio.wait_for(self.data_queue.get(), self._omni_retransmit_time * self._omni_retransmit_count)
432+
resp = await asyncio.wait_for(self.data_queue.get(), response_timeout)
407433
except TimeoutError as exc:
408434
msg = f"Timeout receiving fragment: got {len(data_fragments)}/{leadmsg.msg_block_count} fragments: {exc}"
409435
raise OmniFragmentationError(msg) from exc

0 commit comments

Comments
 (0)