Skip to content

add support for streaming with TaskGroups #117

@graingert

Description

@graingert

Code of Conduct

  • I agree to follow Django's Code of Conduct

Feature Description

see https://forum.djangoproject.com/t/streamingresponse-driven-by-a-taskgroup/40320/4

I'd like to be able to write code that combines multiple streams of data:

async def news_and_weather(request: HttpRequest) -> StreamingHttpResponse:
    async def gen() -> AsyncGenerator[bytes]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                async for msg in rx:
                    yield msg  # yield in async generator!! illegal inside TaskGroup!
    return StreamingHttpResponse(gen())

Problem

however this doesn’t work because I’m using a yield inside an async generator that’s not a context manager, and calling aclosing() on that async generator is not sufficient to allow a TaskGroup to cancel itself and catch the cancel error.

from useful_types import SupportsAnext

class AsyncIteratorBytesResource(Protocol):
    """
    all the machinery needed to safely run an AsyncGenerator[Bytes]

    (for django-stubs) this allows AsyncGenerator[bytes] but is less strict
    so would also allow a anyio MemoryObjectRecieveStream[bytes]]
    """

    async def __aiter__(self) -> SupportsAnext[bytes]: ...
    async def aclose(self) -> object: ...


async def news_and_weather(request: HttpRequest) -> StreamingAcmgrHttpResponse:
    @contextlib.asynccontextmanager
    async def acmgr_gen() -> AsyncGenerator[AsyncIteratorBytesResource]:
        async def push(ws_url: str, tx: MemoryObjectSendStream) -> None:
            async with tx, connect_ws(ws_url) as conn:
                async for msg in conn:
                    await tx.send(msg)

        async with anyio.create_task_group() as tg:
            tx, rx =  anyio.create_memory_object_stream[bytes]()
            with tx, rx:
                tg.start_soon(push, "ws://example.com/news", tx.clone())
                tg.start_soon(push, "ws://example.com/weather", tx.clone())
                tx.close()
                yield rx  # yield inside asynccontextmanager, permitted inside TaskGroup

    return StreamingAcmgrHttpResponse(acmgr_gen())

Request or proposal

proposal

Additional Details

No response

Implementation Suggestions

https://github.com/django/django/pull/19364/changes

Metadata

Metadata

Assignees

No one assigned

    Labels

    AsyncDjango CoreThis idea is suitable for inclusion in Django itself.Views

    Type

    No type

    Projects

    Status

    In progress

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions