About granian's behaviour when external interrupt such as keyboard interrupt is recieved #762
-
|
Hello, I have a question about how granian propagate interrupt to app, I have my fastapi lifespan function and server start function like below (factory function). def _create_target_loader(target: str, *, config_init_func_target: str, config_file: pathlib.Path | None) -> fastapi.FastAPI:
config_init_func: typing.Callable[[pathlib.Path | None], tuple[SERVER_CONFIG, pathlib.Path | None]] = _dotted_path_to_func(
config_init_func_target
)
app_factory_func: typing.Callable[[SERVER_CONFIG], fastapi.FastAPI] = _dotted_path_to_func(target)
config, config_file = config_init_func(config_file)
return app_factory_func(config)
def create_server(
*,
base_logger: Logger,
logger: FilteringBoundLogger,
package_root: pathlib.Path,
project_root: pathlib.Path,
app_factory_func_target: str,
config_init_func_target: str,
process_name_literal: list[str] = ["generic", "server"],
**kwargs,
) -> Callable[[], None]:
config_init_func: Callable[[pathlib.Path | None], tuple[SERVER_CONFIG, pathlib.Path | None]] = _dotted_path_to_func(
config_init_func_target
)
server_kwargs: dict[str, object] = kwargs.copy()
def server() -> None:
app_env = os.environ.get("APP_ENV", "prod")
os.environ["APP_ENV"] = app_env
logger.debug(f"Set environment variable APP_ENV to {app_env}")
if dev := app_env.startswith("dev"):
config_file = project_root.joinpath("config.toml").expanduser()
else:
if len(sys.argv) == 2:
config_file = pathlib.Path(sys.argv[1]).expanduser()
logger.debug(f"Using configuration from {config_file}")
else:
config_file = None
logger.debug("Using default configuration")
config, config_file = config_init_func(config_file)
logger.debug(f"Set log level to {str(config.log_level)}")
base_logger.level = config.log_level
logger.debug("Set up logger")
base_logger.configure()
reload_paths: list[pathlib.Path] = []
if config_file is not None:
reload_paths.append(config_file)
log_access = dev
reload = bool(server_kwargs.pop("reload", True))
_process_name_literal = process_name_literal
if dev:
reload = True
reload_paths.append(package_root)
_process_name_literal += ["dev"]
server = Granian(
app_factory_func_target,
address=config._address,
port=config._port,
uds=config._uds,
uds_permissions=660,
interface=Interfaces.ASGI,
workers=config.num_workers,
runtime_mode=RuntimeModes.mt,
loop=Loops.uvloop,
http=HTTPModes.auto,
log_level=LogLevels(str(config.log_level).lower()),
log_access=log_access,
log_access_format="%(addr)s - '%(method)s %(path)s %(protocol)s' %(status)d %(dt_ms).3f",
respawn_failed_workers=bool(server_kwargs.pop("respawn_failed_workers", True)),
workers_kill_timeout=int(server_kwargs.pop("workers_kill_timeout", 60)),
factory=True,
working_dir=package_root,
reload=reload,
reload_paths=reload_paths,
reload_ignore_patterns=list(server_kwargs.pop("reload_ignore_patterns", DEFAULT_RELOAD_IGNORE_PATTERN)),
reload_ignore_worker_failure=bool(server_kwargs.pop("reload_ignore_worker_failure", True)),
process_name=str(server_kwargs.pop("process_name", "-".join(_process_name_literal))),
**server_kwargs,
)
target_loader: functools.partial[fastapi.FastAPI] = functools.partial(
_create_target_loader, config_init_func_target=config_init_func_target, config_file=config_file
)
try:
logger.info(f"Start server at {str(config.service_url)}")
server.serve(target_loader=target_loader)
except (KeyboardInterrupt, SystemExit, anyio.get_cancelled_exc_class()) as e:
logger.warning(f"Server is terminated by {get_name(e)}", exc_info=e)
pass
except Exception as e:
logger.error("Failed to start server", exc_info=e)
pass
finally:
if server is not None:
server.shutdown()
return server
def create_lifecycle(
*,
base_logger: Logger,
logger: FilteringBoundLogger,
startup_hook: Callable[[fastapi.FastAPI, SERVER_CONFIG], None]
| Callable[[fastapi.FastAPI, SERVER_CONFIG], Awaitable[None]],
shutdown_hook: Callable[[fastapi.FastAPI, SERVER_CONFIG], None]
| Callable[[fastapi.FastAPI, SERVER_CONFIG], Awaitable[None]],
) -> Callable[[fastapi.FastAPI, SERVER_CONFIG], typing.AsyncContextManager[None]]:
if not callable(startup_hook):
raise TypeError("startup_hook must be callable")
if not callable(shutdown_hook):
raise TypeError("shutdown_hook must be callable")
@contextlib.asynccontextmanager
async def lifecycle(app: fastapi.FastAPI, config: SERVER_CONFIG):
try:
logger.info(f"Set log level to {str(config.log_level)}")
base_logger.level = config.log_level
logger.info("Set up logger")
base_logger.configure()
app.state.config = config
await logger.ainfo("Server is starting")
if inspect.iscoroutinefunction(startup_hook):
await startup_hook(app, app.state.config)
else:
startup_hook(app, app.state.config)
await logger.ainfo("Server is ready")
yield
except (KeyboardInterrupt, SystemExit, anyio.get_cancelled_exc_class()) as e:
with contextlib.suppress(Exception):
await logger.awarning(f"Server is terminated by {get_name(e)}", exc_info=e)
raise
except Exception as e:
with contextlib.suppress(Exception):
await logger.aerror("Server failed due to an unexpected error", exc_info=e)
raise
finally:
with anyio.CancelScope(shield=True):
try:
await logger.ainfo("Server is shutting down")
if inspect.iscoroutinefunction(shutdown_hook):
await shutdown_hook(app, app.state.config)
else:
shutdown_hook(app, app.state.config)
gc.collect()
await logger.ainfo("Server is stopped")
except Exception as e:
with contextlib.suppress(Exception):
await logger.aerror("Error occurred during server shutdown", exc_info=e)
return lifecycleAnd I found when I successfully shutdown server by ctrl-c, the 2025-12-03T05:57:51.923948Z [INFO ] yolo_utils.server.App: Server is ready app_env=prod
2025-12-03T05:57:51.924390Z [INFO ] _granian.workers: Started worker-1 func_name=None lineno=1175 pathname=src/workers.rs thread=8557571136 process=95751 app_env=prod
^C[INFO] Shutting down granian
2025-12-03T05:57:59.872433Z [INFO ] _granian.workers: Stopping worker-1 func_name=None lineno=1175 pathname=src/workers.rs thread=6220361728 process=95751 app_env=prod
2025-12-03T05:57:59.880689Z [INFO ] yolo_utils.server.App: Server is shutting down app_env=prod
2025-12-03T05:57:59.881094Z [INFO ] yolo_utils.server.App: Clean up pipeline app_env=prod
2025-12-03T05:57:59.980661Z [INFO ] yolo_utils.server.App: Server is stopped app_env=prodAlso there's an another problem is that if I start some background tasks inside startup_hook using anyio taskgroup, it always prevent server shutdown after finishing finally block and results to force kill behaviour. An example is like the code below. # what is inside the startup hook function
app.state._background_tasks = anyio.create_task_group()
await app.state._background_tasks.__aenter__()
app.state._background_tasks.start_soon(capture_task, app, name="Frame capturing")
app.state._background_tasks.start_soon(detection_task, app, name="License detection")
app.state._background_tasks.start_soon(recognition_task, app, name="License recognition")
app.state._background_tasks.start_soon(recording_task, app, name="Activity recording")
# what is inside the shutdown hook function
await app.state._background_tasks.__aexit__(None, None, None)
# and the task's inner structure
try:
while True:
try:
# do something
except HTTPException:
try:
await check_service_status(
"Stream Service", service_url=app.state.config.stream_service_url, max_attempt=10
)
except* Exception as e:
await LOGGER.aerror("Failed to reconnect to stream service", exc_info=e)
pass
except Exception as e:
await LOGGER.aerror("Unexpected error in capture task", exc_info=e)
raise
await anyio.sleep(DELAY)
except (KeyboardInterrupt, SystemExit, anyio.get_cancelled_exc_class()) as e:
await LOGGER.awarning("Capture task is cancelled by user", exc_info=e)
raiseAnd my environment is # Python 3.12
❯ uv pip list | rg -i "fastapi|granian|anyio"
anyio 4.12.0
fastapi 0.123.5
granian 2.6.0I'm not sure if this is caused by how I handle the exception or something granian not support. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
|
Hello @gi0baro, I’m sorry to bother you but I’m really struggling with this. I’m not sure if it’s a bug or if I’m doing something wrong. Every time I try to terminate my service the main process is killed but left with dangling workers. I need to manually kill the worker or it won’t stop. This also happens when reloading is enabled causing a force kill. |
Beta Was this translation helpful? Give feedback.
-
Granian doesn't propagate signals to the application.
Granian shutdown connections gracefully. If you start any other task, you need to handle client disconnections (there's a specifc ASGI event for that) and cancel those tasks accordingly. |
Beta Was this translation helpful? Give feedback.
Granian doesn't propagate signals to the application.
If you want to manage the signals yourself you need to use the embedded server.
Granian shutdown connections gracefully. If you start any other task, you need to handle client disconnections (there's a specifc ASGI eve…