Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions aiostream/stream/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,15 @@ def map(
sequence is exhausted. The function can either be synchronous or
asynchronous (coroutine function).

The results can either be returned in or out of order, depending on
the corresponding ``ordered`` argument. This argument is ignored if the
provided function is synchronous.
If the function is asynchronous, the results can either be returned in
or out of order, depending on the corresponding ``ordered`` argument.
A ``ValueError`` is raised if this argument is provided and the provided
function is synchronous.

The coroutines run concurrently but their amount can be limited using
the ``task_limit`` argument. A value of ``1`` will cause the coroutines
to run sequentially. This argument is ignored if the provided function
is synchronous.
If the function is asynchronous, the coroutines run concurrently but their
amount can be limited using the ``task_limit`` argument. A value of ``1``
will cause the coroutines to run sequentially. A ``ValueError`` is raised
if this argument is provided and the provided function is synchronous.

If more than one sequence is provided, they're also awaited concurrently,
so that their waiting times don't add up.
Expand All @@ -222,6 +223,17 @@ def map(
return amap.raw(
source, func, *more_sources, ordered=ordered, task_limit=task_limit
)

if not ordered:
raise ValueError(
"The 'ordered' argument can only be used when the provided function is asynchronous."
)

if task_limit is not None:
raise ValueError(
"The 'task_limit' argument can only be used when the provided function is asynchronous."
)

sync_func = cast("SmapCallable[T, U]", func)
return smap.raw(source, sync_func, *more_sources)

Expand Down
6 changes: 5 additions & 1 deletion tests/test_combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,13 @@ def not_a_coro_function(arg: int, *_) -> Awaitable[int]:
await assert_run(ys, [1, 2, 3, 4, 5])
assert loop.steps == [1, 1, 1, 1, 1]

# Invalid argument
# Invalid arguments
with pytest.raises(ValueError):
await (stream.range(1, 4) | pipe.map(sleep_only, task_limit=0))
with pytest.raises(ValueError):
await (stream.range(1, 4) | pipe.map(square_target, task_limit=3))
with pytest.raises(ValueError):
await (stream.range(1, 4) | pipe.map(square_target, ordered=False))

# Break
with assert_cleanup() as loop:
Expand Down
Loading