Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion framework/py/flwr/cli/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
from flwr.proto.control_pb2 import StreamLogsRequest # pylint: disable=E0611
from flwr.proto.control_pb2_grpc import ControlStub

from .utils import flwr_cli_grpc_exc_handler, init_channel_from_connection
from .utils import (
flwr_cli_grpc_exc_handler,
init_channel_from_connection,
wait_for_control_api_channel,
)


class AllLogsRetrieved(BaseException):
Expand Down Expand Up @@ -218,6 +222,7 @@ def _log_with_control_api(
If True, stream logs continuously; if False, print once.
"""
channel = init_channel_from_connection(superlink_connection)
wait_for_control_api_channel(channel)

if stream:
start_stream(run_id, channel, CONN_REFRESH_PERIOD)
Expand Down
2 changes: 2 additions & 0 deletions framework/py/flwr/cli/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
flwr_cli_grpc_exc_handler,
init_channel_from_connection,
print_json_to_stdout,
wait_for_control_api_channel,
)


Expand Down Expand Up @@ -114,6 +115,7 @@ def ls( # pylint: disable=too-many-locals, too-many-branches, R0913, R0917

try:
channel = init_channel_from_connection(superlink_connection)
wait_for_control_api_channel(channel)
stub = ControlStub(channel)

# Display information about a specific run ID
Expand Down
2 changes: 2 additions & 0 deletions framework/py/flwr/cli/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
flwr_cli_grpc_exc_handler,
init_channel_from_connection,
print_json_to_stdout,
wait_for_control_api_channel,
)

CONN_REFRESH_PERIOD = 60 # Connection refresh period for log streaming (seconds)
Expand Down Expand Up @@ -201,6 +202,7 @@ def _run_with_control_api(
app_spec=app_spec or "",
)
with flwr_cli_grpc_exc_handler():
wait_for_control_api_channel(channel)
res = stub.StartRun(req)

if res.HasField("note"):
Expand Down
2 changes: 2 additions & 0 deletions framework/py/flwr/cli/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
flwr_cli_grpc_exc_handler,
init_channel_from_connection,
print_json_to_stdout,
wait_for_control_api_channel,
)


Expand Down Expand Up @@ -82,6 +83,7 @@ def stop( # pylint: disable=R0914

try:
channel = init_channel_from_connection(superlink_connection)
wait_for_control_api_channel(channel)
stub = ControlStub(channel) # pylint: disable=unused-variable # noqa: F841

typer.secho(f"✋ Stopping run ID {run_id}...", fg=typer.colors.GREEN)
Expand Down
32 changes: 28 additions & 4 deletions framework/py/flwr/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import re
import sys
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
from contextlib import contextmanager
from io import StringIO
Expand Down Expand Up @@ -69,6 +70,13 @@
from .flower_config import read_superlink_connection
from .local_superlink import ensure_local_superlink

SUPERLINK_UNAVAILABLE_MESSAGE = (
"Connection to the SuperLink is unavailable. Please check your network "
"connection and 'address' in the SuperLink connection configuration."
)
CONTROL_API_READY_TIMEOUT_SECONDS = 30
CONTROL_API_READY_CHECK_INTERVAL_SECONDS = 1


def print_json_to_stdout(data: str | Any) -> None:
"""Print JSON data to stdout, bypassing any output redirection.
Expand Down Expand Up @@ -396,6 +404,25 @@ def cli_output_control_stub(
channel.close()


def wait_for_control_api_channel(
channel: grpc.Channel,
timeout: float = CONTROL_API_READY_TIMEOUT_SECONDS,
check_interval: float = CONTROL_API_READY_CHECK_INTERVAL_SECONDS,
) -> None:
"""Wait for the Control API channel to become ready before sending an RPC."""
deadline = time.monotonic() + timeout
future = grpc.channel_ready_future(channel)
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise click.ClickException(SUPERLINK_UNAVAILABLE_MESSAGE)
try:
future.result(timeout=min(check_interval, remaining))
return
except grpc.FutureTimeoutError:
continue


@contextmanager # docsig: disable=SIG503
def flwr_cli_grpc_exc_handler( # pylint: disable=too-many-branches
custom_handler: Callable[[grpc.RpcError], None] | None = None,
Expand Down Expand Up @@ -450,10 +477,7 @@ def flwr_cli_grpc_exc_handler( # pylint: disable=too-many-branches
msg = "Permission denied." if details == "" else f"{details}"
raise click.ClickException(msg) from None
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise click.ClickException(
"Connection to the SuperLink is unavailable. Please check your network "
"connection and 'address' in the SuperLink connection configuration."
) from None
raise click.ClickException(SUPERLINK_UNAVAILABLE_MESSAGE) from None
if e.code() == grpc.StatusCode.NOT_FOUND:
if details == RUN_ID_NOT_FOUND_MESSAGE:
raise click.ClickException("Run ID not found.") from None
Expand Down
22 changes: 22 additions & 0 deletions framework/py/flwr/cli/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
init_channel_from_connection,
load_gitignore_patterns,
validate_federation_name,
wait_for_control_api_channel,
)


Expand Down Expand Up @@ -181,6 +182,27 @@ def test_load_gitignore_patterns_with_pathspec() -> None:
assert spec.match_file("good.py") is False


def test_wait_for_control_api_channel_retries_until_ready() -> None:
"""Test that Control API readiness waits through transient unavailability."""
future = Mock()
future.result.side_effect = [grpc.FutureTimeoutError(), None]

with patch("flwr.cli.utils.grpc.channel_ready_future", return_value=future):
wait_for_control_api_channel(Mock(), timeout=1, check_interval=0.01)

assert future.result.call_count == 2


def test_wait_for_control_api_channel_fails_after_timeout() -> None:
"""Test that Control API readiness fails after the timeout expires."""
future = Mock()
future.result.side_effect = grpc.FutureTimeoutError()

with patch("flwr.cli.utils.grpc.channel_ready_future", return_value=future):
with pytest.raises(click.ClickException, match="SuperLink is unavailable"):
wait_for_control_api_channel(Mock(), timeout=0.01, check_interval=0.01)


def test_get_executed_command_single() -> None:
"""Test get_executed_command with a two-word command (e.g., flwr ls)."""
root_group = click.Group("flwr")
Expand Down
Loading