TC-SU-2.2: Interactive timeout extension for physical device testing#72628
TC-SU-2.2: Interactive timeout extension for physical device testing#72628khodya wants to merge 17 commits into
Conversation
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
… extended_timeouts
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Code Review
This pull request introduces an interactive timeout prompt coordinator (PromptCoordinator) to serialize timeout prompts and track paused time during interactive Matter testing, updating test decorators, subscription handlers, and test cases accordingly. The review feedback highlights several critical improvements: utilizing time.monotonic() instead of time.time() to ensure robust elapsed time calculations against system clock adjustments, checking for a None value on sys.stdin to prevent potential AttributeErrors, and optimizing the supervisor thread in decorators.py with a threading.Event to eliminate a redundant 2-second delay on successful test completions.
| async def _run_with_interactive_timeout(coro, timeout_sec: float, coordinator) -> object: | ||
| """Run coro with an interactive overall-timeout supervisor. | ||
|
|
||
| A background thread counts down ``timeout_sec`` of *effective* elapsed time | ||
| (wall time minus time spent in prompts via the coordinator). When the deadline | ||
| is hit in non-interactive mode the task is cancelled immediately; in interactive | ||
| mode the user is asked whether to extend or abort. | ||
|
|
||
| ``asyncio.CancelledError`` that results from an "abort" choice is caught here and | ||
| converted to a clean Mobly test-failure signal so the test runner never sees an | ||
| unhandled cancellation. | ||
| """ | ||
| loop = asyncio.get_event_loop() | ||
| task = asyncio.ensure_future(coro) | ||
| current_timeout = [float(timeout_sec)] | ||
| wall_start = time.time() | ||
|
|
||
| def _supervisor(): | ||
| while not task.done(): | ||
| time.sleep(5.0) | ||
| if task.done(): | ||
| break | ||
| wall_elapsed = time.time() - wall_start | ||
| effective = wall_elapsed - coordinator.total_prompt_seconds | ||
| if effective < current_timeout[0]: | ||
| continue | ||
|
|
||
| def still_needed(): return (time.time() - wall_start - coordinator.total_prompt_seconds) >= current_timeout[0] | ||
| extended = coordinator.ask_user( | ||
| description=f"Overall test timeout ({current_timeout[0]:.0f}s) reached", | ||
| elapsed_sec=effective, | ||
| extension_sec=current_timeout[0], | ||
| still_needed=still_needed, | ||
| ) | ||
| if extended: | ||
| current_timeout[0] += current_timeout[0] | ||
| else: | ||
| loop.call_soon_threadsafe(task.cancel) | ||
| return | ||
|
|
||
| sv_thread = threading.Thread(target=_supervisor, daemon=True, name="timeout-supervisor") | ||
| sv_thread.start() | ||
|
|
||
| try: | ||
| return await task | ||
| except asyncio.CancelledError: | ||
| asserts.fail( | ||
| f"Test aborted by user after overall timeout ({timeout_sec:.0f}s)." | ||
| ) | ||
| finally: | ||
| sv_thread.join(timeout=2.0) |
There was a problem hiding this comment.
Efficiency & Correctness Issues
- Efficiency (2.0s delay on every test): The supervisor thread uses
time.sleep(5.0)in a loop. When a test finishes successfully, the thread is likely still sleeping. The main thread'sfinallyblock callssv_thread.join(timeout=2.0), which blocks the main thread for up to 2.0 seconds on every single successful test run. We can eliminate this delay entirely by using athreading.Eventto interrupt the sleep immediately. - Correctness (Clock adjustments): Using
time.time()for measuring elapsed time is vulnerable to system clock changes (e.g., NTP sync). We should usetime.monotonic()instead. - Correctness (Redundant prompts): The
still_neededlambda should checknot task.done()to avoid prompting the user if the task has already completed while waiting for the lock.
async def _run_with_interactive_timeout(coro, timeout_sec: float, coordinator) -> object:
"""Run coro with an interactive overall-timeout supervisor.
A background thread counts down ``timeout_sec`` of *effective* elapsed time
(wall time minus time spent in prompts via the coordinator). When the deadline
is hit in non-interactive mode the task is cancelled immediately; in interactive
mode the user is asked whether to extend or abort.
``asyncio.CancelledError`` that results from an "abort" choice is caught here and
converted to a clean Mobly test-failure signal so the test runner never sees an
unhandled cancellation.
"""
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coro)
current_timeout = [float(timeout_sec)]
wall_start = time.monotonic()
exit_event = threading.Event()
def _supervisor():
while not exit_event.wait(timeout=5.0):
if task.done():
break
wall_elapsed = time.monotonic() - wall_start
effective = wall_elapsed - coordinator.total_prompt_seconds
if effective < current_timeout[0]:
continue
still_needed = lambda: not task.done() and (time.monotonic() - wall_start - coordinator.total_prompt_seconds) >= current_timeout[0]
extended = coordinator.ask_user(
description=f"Overall test timeout ({current_timeout[0]:.0f}s) reached",
elapsed_sec=effective,
extension_sec=current_timeout[0],
still_needed=still_needed,
)
if extended:
current_timeout[0] += current_timeout[0]
else:
loop.call_soon_threadsafe(task.cancel)
return
sv_thread = threading.Thread(target=_supervisor, daemon=True, name="timeout-supervisor")
sv_thread.start()
try:
return await task
except asyncio.CancelledError:
asserts.fail(
f"Test aborted by user after overall timeout ({timeout_sec:.0f}s)."
)
finally:
exit_event.set()
sv_thread.join(timeout=2.0)| @property | ||
| def is_interactive(self) -> bool: | ||
| return sys.stdin.isatty() |
There was a problem hiding this comment.
Defensive programming: sys.stdin can be None in certain environments (e.g., when run under some daemon managers or GUI wrappers). We should check if sys.stdin is not None before calling isatty() to prevent an AttributeError.
| @property | |
| def is_interactive(self) -> bool: | |
| return sys.stdin.isatty() | |
| @property | |
| def is_interactive(self) -> bool: | |
| return sys.stdin is not None and sys.stdin.isatty() |
| t_wait_start = time.time() | ||
| self._lock.acquire() | ||
| wait_time = time.time() - t_wait_start | ||
| self._add_prompt_time(wait_time) | ||
|
|
||
| try: | ||
| if still_needed is not None and not still_needed(): | ||
| # The timeout was resolved while we were waiting for the lock. | ||
| return True | ||
|
|
||
| prompt_start = time.time() | ||
| print( | ||
| f"\n⏱ Timeout: {description}\n" | ||
| f" Elapsed: {elapsed_sec:.0f}s — extend by {extension_sec:.0f}s? " | ||
| f"[y]es / [n]o (abort): ", | ||
| end="", | ||
| flush=True, | ||
| ) | ||
| answer = sys.stdin.readline().strip().lower() | ||
| self._add_prompt_time(time.time() - prompt_start) |
There was a problem hiding this comment.
Use time.monotonic() instead of time.time() for measuring elapsed time and intervals. time.time() is subject to system clock adjustments (e.g., NTP sync), which can cause incorrect calculations or flaky behavior.
| t_wait_start = time.time() | |
| self._lock.acquire() | |
| wait_time = time.time() - t_wait_start | |
| self._add_prompt_time(wait_time) | |
| try: | |
| if still_needed is not None and not still_needed(): | |
| # The timeout was resolved while we were waiting for the lock. | |
| return True | |
| prompt_start = time.time() | |
| print( | |
| f"\n⏱ Timeout: {description}\n" | |
| f" Elapsed: {elapsed_sec:.0f}s — extend by {extension_sec:.0f}s? " | |
| f"[y]es / [n]o (abort): ", | |
| end="", | |
| flush=True, | |
| ) | |
| answer = sys.stdin.readline().strip().lower() | |
| self._add_prompt_time(time.time() - prompt_start) | |
| t_wait_start = time.monotonic() | |
| self._lock.acquire() | |
| wait_time = time.monotonic() - t_wait_start | |
| self._add_prompt_time(wait_time) | |
| try: | |
| if still_needed is not None and not still_needed(): | |
| # The timeout was resolved while we were waiting for the lock. | |
| return True | |
| prompt_start = time.monotonic() | |
| print( | |
| f"\n⏱ Timeout: {description}\n" | |
| f" Elapsed: {elapsed_sec:.0f}s — extend by {extension_sec:.0f}s? " | |
| f"[y]es / [n]o (abort): ", | |
| end="", | |
| flush=True, | |
| ) | |
| answer = sys.stdin.readline().strip().lower() | |
| self._add_prompt_time(time.monotonic() - prompt_start) |
| reboot_deadline_sec = s5_reboot_timeout_sec | ||
| reboot_start = time.time() | ||
| attempt = 0 | ||
|
|
||
| while True: | ||
| await asyncio.sleep(poll_interval_sec) | ||
| attempt += 1 | ||
| try: | ||
| await controller.GetConnectedDevice( | ||
| requestor_node_id, allowPASE=False, timeoutMs=reconnect_timeout_ms) | ||
| reconnected = True | ||
| logger.info('%s: Step #5.6 - DUT reconnected after OTA reboot (attempt %s).', step_number_s5, attempt + 1) | ||
| logger.info(f'{step_number_s5}: Step #5.6 - DUT reconnected after OTA reboot (attempt {attempt}).') | ||
| break | ||
| except (TimeoutError, ChipDeviceCtrl.ChipStackError): | ||
| logger.info('%s: Step #5.6 - Waiting for DUT to come back online (attempt %s/%s)...', | ||
| step_number_s5, attempt + 1, reboot_timeout_sec // poll_interval_sec) | ||
| elapsed = time.time() - reboot_start |
There was a problem hiding this comment.
Use time.monotonic() instead of time.time() for measuring elapsed time and intervals. time.time() is subject to system clock adjustments (e.g., NTP sync), which can cause incorrect calculations or flaky behavior.
| reboot_deadline_sec = s5_reboot_timeout_sec | |
| reboot_start = time.time() | |
| attempt = 0 | |
| while True: | |
| await asyncio.sleep(poll_interval_sec) | |
| attempt += 1 | |
| try: | |
| await controller.GetConnectedDevice( | |
| requestor_node_id, allowPASE=False, timeoutMs=reconnect_timeout_ms) | |
| reconnected = True | |
| logger.info('%s: Step #5.6 - DUT reconnected after OTA reboot (attempt %s).', step_number_s5, attempt + 1) | |
| logger.info(f'{step_number_s5}: Step #5.6 - DUT reconnected after OTA reboot (attempt {attempt}).') | |
| break | |
| except (TimeoutError, ChipDeviceCtrl.ChipStackError): | |
| logger.info('%s: Step #5.6 - Waiting for DUT to come back online (attempt %s/%s)...', | |
| step_number_s5, attempt + 1, reboot_timeout_sec // poll_interval_sec) | |
| elapsed = time.time() - reboot_start | |
| reboot_deadline_sec = s5_reboot_timeout_sec | |
| reboot_start = time.monotonic() | |
| attempt = 0 | |
| while True: | |
| await asyncio.sleep(poll_interval_sec) | |
| attempt += 1 | |
| try: | |
| await controller.GetConnectedDevice( | |
| requestor_node_id, allowPASE=False, timeoutMs=reconnect_timeout_ms) | |
| reconnected = True | |
| logger.info(f'{step_number_s5}: Step #5.6 - DUT reconnected after OTA reboot (attempt {attempt}).') | |
| break | |
| except (TimeoutError, ChipDeviceCtrl.ChipStackError): | |
| elapsed = time.monotonic() - reboot_start |
| matchers_list = list(expected_matchers) | ||
| start_time = time.time() | ||
| elapsed = 0.0 | ||
| time_remaining = timeout_sec | ||
|
|
||
| report_matches: dict[int, bool] = {idx: False for idx, _ in enumerate(expected_matchers)} | ||
|
|
||
| for matcher in expected_matchers: | ||
| LOGGER.info("--> Matcher waiting: %s", matcher.description) | ||
| LOGGER.info("Waiting for %.1f seconds for all reports.", timeout_sec) | ||
|
|
||
| while time_remaining > 0: | ||
| while True: | ||
| elapsed = time.time() - start_time | ||
| time_remaining = timeout_sec - elapsed |
There was a problem hiding this comment.
Use time.monotonic() instead of time.time() for measuring elapsed time and intervals. time.time() is subject to system clock adjustments (e.g., NTP sync), which can cause incorrect calculations or flaky behavior.
| matchers_list = list(expected_matchers) | |
| start_time = time.time() | |
| elapsed = 0.0 | |
| time_remaining = timeout_sec | |
| report_matches: dict[int, bool] = {idx: False for idx, _ in enumerate(expected_matchers)} | |
| for matcher in expected_matchers: | |
| LOGGER.info("--> Matcher waiting: %s", matcher.description) | |
| LOGGER.info("Waiting for %.1f seconds for all reports.", timeout_sec) | |
| while time_remaining > 0: | |
| while True: | |
| elapsed = time.time() - start_time | |
| time_remaining = timeout_sec - elapsed | |
| matchers_list = list(expected_matchers) | |
| start_time = time.monotonic() | |
| report_matches: dict[int, bool] = {idx: False for idx, _ in enumerate(expected_matchers)} | |
| for matcher in expected_matchers: | |
| LOGGER.info("--> Matcher waiting: %s", matcher.description) | |
| LOGGER.info("Waiting for %.1f seconds for all reports.", timeout_sec) | |
| while True: | |
| elapsed = time.monotonic() - start_time | |
| time_remaining = timeout_sec - elapsed |
|
PR #72628: Size comparison from ff91326 to 8e99fac Full report (21 builds for bl602, bl616, bl702, bl702l, cc13x4_26x4, cc32xx, nrfconnect, psoc6, qpg, realtek, stm32)
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #72628 +/- ##
=======================================
Coverage 56.06% 56.06%
=======================================
Files 1640 1640
Lines 112575 112575
Branches 13353 13353
=======================================
Hits 63110 63110
Misses 49465 49465 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
PR #72628: Size comparison from ff91326 to f20b2fb Full report (35 builds for bl602, bl616, bl702, bl702l, cc13x4_26x4, cc32xx, efr32, esp32, nrfconnect, psoc6, qpg, realtek, stm32, telink)
|
|
PR #72628: Size comparison from ff91326 to e64c89b Full report (35 builds for bl602, bl616, bl702, bl702l, cc13x4_26x4, cc32xx, efr32, esp32, nrfconnect, psoc6, qpg, realtek, stm32, telink)
|
jtrejoespinoza-grid
left a comment
There was a problem hiding this comment.
Looks great. I think the implementation could be improved a bit but overall looks good.
| print( | ||
| f"\n⏱ Timeout: {description}\n" | ||
| f" Elapsed: {elapsed_sec:.0f}s — extend by {extension_sec:.0f}s? " | ||
| f"[y]es / [n]o (abort): ", |
There was a problem hiding this comment.
Looks a bit hard to read, using Mayus [Y]es/[N]o is easier to read also it does not matter as the code already strips and lower the text.
|
|
||
| prompt_start = time.time() | ||
| print( | ||
| f"\n⏱ Timeout: {description}\n" |
There was a problem hiding this comment.
Nit: not sure is we should use these Symbols like "Clock".
| self._prompt_coordinator = PromptCoordinator() | ||
| return self._prompt_coordinator | ||
|
|
||
| def make_timeout_callback(self, description: str, extension_sec: float = 600.0) -> Callable: |
There was a problem hiding this comment.
nit: I see this method is here because it uses self.prompt_coordinator.ask_user but this methods looks like it should be in the file: event_attribute_reporting.py as is especific to be used only with the methods in that file.
| # --int-arg ota_provider_port:5541 | ||
| # --timeout 2100 | ||
| # factory-reset: true | ||
| # quiet: false |
There was a problem hiding this comment.
Change to quiet: true. This to be able to see the real status in the Nigthly job and not just logs from the tests. If you open the logs Nightly at the test TC_SU_2_2 you will notice is not possible to verify without difficulty the real status of the test.
| # --int-arg ota_provider_port:5541 | ||
| # --timeout 2100 | ||
| # factory-reset: true | ||
| # quiet: false |
There was a problem hiding this comment.
| # quiet: true |
Update this to true to avoid having a lot of logs in the execution report that not allow us to see the real reason of what caused the test to fail.
| ) | ||
|
|
||
| subscription_attr_applying.await_all_expected_report_matches([matcher_applying_obj], timeout_sec=800.0) | ||
| subscription_attr_applying.await_all_expected_report_matches( |
There was a problem hiding this comment.
Just a comment here about the implementation: In this scenario the Download is on Progress and if the method reaches the end of the timeout it ask for more time to wait for the kApplying state as many times the tester think is needed. But the problem I notice is the test just informs the kApplying State has not been reached and wait more time but we do not know if the download was stuck at 10% or 20% or at 98%. Would not be better to check for the download to reach maybe 98% or 99% (UpdateProgess) and then wait for the kApplying State. In that way the descriptor will tell the user where the Download was when the timeout was reached. Probably in that way the test can also decrease also the extension_sec as it can be repeated multiple times if needed.
Summary
When TC_SU_2_2.py runs against a real device, OTA transfer, apply, and reboot durations are unpredictable. This PR replaces hardcoded waits with an extensible timeout system that prompts the operator in interactive sessions and fails fast in CI.
New: PromptCoordinator (matter/testing/prompt_coordinator.py)
Per-step timeout callbacks (event_attribute_reporting.py)
Overall test supervisor (decorators.py)
MatterBaseTest.make_timeout_callback() (matter_testing.py)
TC_SU_2_2.py changes:
Testing
overall supervisor — all passing