Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aiostream/stream/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ async def zip(
if strict:
coros = (anext(streamer, STOP_SENTINEL) for streamer in streamers)
_items = await asyncio.gather(*coros)
if all(item == STOP_SENTINEL for item in _items):
if all(isinstance(item, _StopSentinelType) for item in _items):
break
elif any(item == STOP_SENTINEL for item in _items):
elif any(isinstance(item, _StopSentinelType) for item in _items):
raise ValueError("The provided sources have different lengths")
# This holds because we've ruled out STOP_SENTINEL above:
items = cast("list[T]", _items)
Expand Down
24 changes: 24 additions & 0 deletions tests/test_combine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
from typing import Awaitable
import pytest
import asyncio
Expand All @@ -22,6 +23,29 @@ async def test_chain(assert_run, assert_cleanup):
await assert_run(xs, [])


class ThrowsInEquality:

def __eq__(self, other):
raise Exception("We should not reach this!")


@pytest.fixture(
params=[stream.range, partial(stream.repeat, ThrowsInEquality())],
ids=["range", "iterator_of_items_that_throw"],
)
def make_range(request):
return request.param


@pytest.mark.asyncio
async def test_zip_strict(make_range):
xs = make_range(3)
ys = make_range(2)
zs = stream.zip(xs, ys, strict=True)
with pytest.raises(ValueError, match="The provided sources have different lengths"):
await zs


@pytest.mark.asyncio
async def test_zip(assert_run):
xs = stream.range(5) | add_resource.pipe(1.0)
Expand Down
Loading