Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion aiostream/stream/advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from . import combine

from ..core import Streamer, pipable_operator
from ..core import Streamer, pipable_operator, streamcontext
from ..manager import StreamerManager


Expand Down Expand Up @@ -40,11 +40,22 @@ async def base_combine(
The items can either be generated in order or as soon as they're received,
depending on the ``ordered`` argument.
"""
# Task limit is never provided in switch mode (should always be 2)
if switch:
assert task_limit is None

# Task limit
if task_limit is not None and not task_limit > 0:
raise ValueError("The task limit must be None or greater than 0")

# Sequential case
if task_limit == 1:
async with streamcontext(source) as mainstreamer:
async for substream in mainstreamer:
async with streamcontext(substream) as substreamer:
async for item in substreamer:
yield item

# Safe context
async with StreamerManager[Union[AsyncIterable[T], T]]() as manager:
main_streamer: Streamer[AsyncIterable[T] | T] | None = (
Expand Down
12 changes: 8 additions & 4 deletions aiostream/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,15 @@ def event_loop_policy() -> TimeTrackingTestLoopPolicy:


@pytest.fixture # type: ignore[misc]
def assert_cleanup(
event_loop: TimeTrackingTestLoop,
) -> Callable[[], ContextManager[TimeTrackingTestLoop]]:
def assert_cleanup() -> Callable[[], ContextManager[TimeTrackingTestLoop]]:
"""Fixture to assert cleanup of resources."""
return event_loop.assert_cleanup

def _assert_cleanup() -> ContextManager[TimeTrackingTestLoop]:
loop = asyncio.get_running_loop()
assert isinstance(loop, TimeTrackingTestLoop)
return loop.assert_cleanup()

return _assert_cleanup


class BaseEventLoopWithInternals(asyncio.BaseEventLoop):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Homepage = "https://github.com/vxgmichel/aiostream"
[tool.pytest.ini_options]
addopts = "--strict-markers --cov aiostream"
testpaths = ["tests"]
asyncio_default_fixture_loop_scope = "function"

[tool.pyright]
strict = [
Expand Down
Loading