Skip to content

Commit 4e34914

Browse files
authored
fix: Fix bad requests when retrying with streams in multi-part requests (#212)
1 parent 0aa5666 commit 4e34914

10 files changed

Lines changed: 747 additions & 17 deletions

File tree

nisystemlink/clients/artifact/_artifact_client.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,21 @@
1010
post,
1111
response_handler,
1212
)
13+
from nisystemlink.clients.core._uplink._multipart_retry import (
14+
retryable_multipart_request,
15+
)
1316
from nisystemlink.clients.core.helpers._iterator_file_like import IteratorFileLike
1417
from requests.models import Response
15-
from uplink import Part, Path
18+
from uplink import Part, Path, retry
1619

1720
from . import models
1821

1922

23+
def _iter_content_filelike_wrapper(response: Response) -> IteratorFileLike:
24+
return IteratorFileLike(response.iter_content(chunk_size=4096))
25+
26+
27+
@retry(when=retry.when.status(429), stop=retry.stop.after_attempt(5))
2028
class ArtifactClient(BaseClient):
2129
def __init__(self, configuration: core.HttpConfiguration | None = None):
2230
"""Initialize an instance.
@@ -35,9 +43,10 @@ def __init__(self, configuration: core.HttpConfiguration | None = None):
3543

3644
super().__init__(configuration, base_path="/ninbartifact/v1/")
3745

38-
@post("artifacts")
46+
@retryable_multipart_request()
47+
@post("artifacts", args=[Part("workspace"), Part("artifact")])
3948
def __upload_artifact(
40-
self, workspace: Part, artifact: Part
49+
self, workspace: str, artifact: BinaryIO
4150
) -> models.UploadArtifactResponse:
4251
"""Uploads an artifact using multipart/form-data headers to send the file payload in the HTTP body.
4352
@@ -49,6 +58,7 @@ def __upload_artifact(
4958
UploadArtifactResponse: The response containing the artifact ID.
5059
5160
"""
61+
...
5262

5363
def upload_artifact(
5464
self, workspace: str, artifact: BinaryIO
@@ -70,9 +80,6 @@ def upload_artifact(
7080

7181
return response
7282

73-
def _iter_content_filelike_wrapper(response: Response) -> IteratorFileLike:
74-
return IteratorFileLike(response.iter_content(chunk_size=4096))
75-
7683
@response_handler(_iter_content_filelike_wrapper)
7784
@get("artifacts/{id}")
7885
def download_artifact(self, id: Path) -> IteratorFileLike:
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
"""Helpers for multipart requests that need retry-safe stream handling.
2+
3+
The decorator exported by this module is intended for multipart requests that may
4+
be retried by Uplink. It preserves the caller's stream position on the initial
5+
send, rewinds seekable multipart parts only on retry attempts, and aborts the
6+
retry if any part cannot be rewound.
7+
8+
When a retry is aborted because a part cannot be rewound, the original response
9+
or exception that triggered the retry is surfaced back through Uplink's normal
10+
response and error handling pipeline instead of sending a malformed follow-up
11+
request.
12+
"""
13+
14+
import io
15+
from enum import auto, Enum
16+
from typing import Any, Callable, cast, TypeVar
17+
18+
from requests import Response
19+
from uplink import decorators
20+
from uplink.clients.io import transitions
21+
from uplink.clients.io.interfaces import RequestTemplate
22+
23+
F = TypeVar("F", bound=Callable[..., Any])
24+
25+
26+
class _RewindResult(Enum):
27+
REWOUND = auto()
28+
FAILED = auto()
29+
NOT_NEEDED = auto()
30+
31+
32+
def _rewind_retryable_part(part: object) -> _RewindResult:
33+
"""Rewind the first seekable multipart payload contained in ``part``.
34+
35+
Returns ``_RewindResult.REWOUND`` when a multipart payload was successfully
36+
rewound to the start of the stream. Returns
37+
``_RewindResult.FAILED`` when a stream payload appears to need rewinding but
38+
rejects it. Returns ``_RewindResult.NOT_NEEDED`` when the part contains no
39+
stream payload that requires rewinding, such as simple string fields.
40+
"""
41+
if hasattr(part, "seek"):
42+
seekable = getattr(part, "seekable", None)
43+
if callable(seekable):
44+
try:
45+
if not cast(Any, seekable)():
46+
return _RewindResult.FAILED
47+
except (OSError, io.UnsupportedOperation):
48+
return _RewindResult.FAILED
49+
50+
try:
51+
cast(Any, part).seek(0)
52+
except (OSError, io.UnsupportedOperation):
53+
return _RewindResult.FAILED
54+
return _RewindResult.REWOUND
55+
56+
if isinstance(part, tuple):
57+
for item in part:
58+
rewind_result = _rewind_retryable_part(item)
59+
if rewind_result is not _RewindResult.NOT_NEEDED:
60+
return rewind_result
61+
62+
return _RewindResult.NOT_NEEDED
63+
64+
65+
def _get_saved_retry_action(
66+
response: Response | None,
67+
exception_info: tuple[type[BaseException], BaseException, Any] | None,
68+
) -> Any:
69+
"""Return the original retry-triggering failure as an Uplink transition."""
70+
if response is not None:
71+
return transitions.finish(response)
72+
73+
if exception_info is not None:
74+
return transitions.fail(*exception_info)
75+
76+
return None
77+
78+
79+
class _RetryableMultipartRequestTemplate(RequestTemplate):
80+
"""Track multipart retry state and rewind streams only for retry sends.
81+
82+
The first request attempt is left untouched so callers can intentionally
83+
provide streams positioned away from offset 0. On later attempts, each file
84+
part must be rewound to the beginning before the request is sent again.
85+
86+
If rewinding fails for any part, the retry is cancelled and the original
87+
response or exception that caused the retry is returned to Uplink so normal
88+
error handling can surface it to the caller.
89+
"""
90+
91+
def __init__(self) -> None:
92+
self._attempted_request_ids: set[int] = set()
93+
self._responses_by_request_id: dict[int, Response] = {}
94+
self._exceptions_by_request_id: dict[
95+
int, tuple[type[BaseException], BaseException, Any]
96+
] = {}
97+
98+
def _clear_request_state(self, request_id: int) -> None:
99+
self._attempted_request_ids.discard(request_id)
100+
self._responses_by_request_id.pop(request_id, None)
101+
self._exceptions_by_request_id.pop(request_id, None)
102+
103+
def before_request(self, request: tuple[str, str, dict[str, Any]]) -> Any:
104+
_, _, extras = request
105+
request_id = id(request)
106+
if request_id not in self._attempted_request_ids:
107+
self._attempted_request_ids.add(request_id)
108+
return None
109+
110+
for part in extras.get("files", {}).values():
111+
if _rewind_retryable_part(part) is _RewindResult.FAILED:
112+
retry_action = _get_saved_retry_action(
113+
self._responses_by_request_id.get(request_id),
114+
self._exceptions_by_request_id.get(request_id),
115+
)
116+
self._clear_request_state(request_id)
117+
return retry_action
118+
return None
119+
120+
def after_response(
121+
self, request: tuple[str, str, dict[str, Any]], response: Response
122+
) -> None:
123+
request_id = id(request)
124+
self._responses_by_request_id[request_id] = response
125+
self._exceptions_by_request_id.pop(request_id, None)
126+
return None
127+
128+
def after_exception(
129+
self,
130+
request: tuple[str, str, dict[str, Any]],
131+
exc_type: type[BaseException],
132+
exc_val: BaseException,
133+
exc_tb: Any,
134+
) -> None:
135+
request_id = id(request)
136+
self._exceptions_by_request_id[request_id] = (exc_type, exc_val, exc_tb)
137+
self._responses_by_request_id.pop(request_id, None)
138+
return None
139+
140+
141+
class _RetryableMultipartCleanupTemplate(RequestTemplate):
142+
def __init__(self, retry_template: _RetryableMultipartRequestTemplate) -> None:
143+
self._retry_template = retry_template
144+
145+
def after_response(
146+
self, request: tuple[str, str, dict[str, Any]], response: Response
147+
) -> None:
148+
self._retry_template._clear_request_state(id(request))
149+
return None
150+
151+
def after_exception(
152+
self,
153+
request: tuple[str, str, dict[str, Any]],
154+
exc_type: type[BaseException],
155+
exc_val: BaseException,
156+
exc_tb: Any,
157+
) -> None:
158+
self._retry_template._clear_request_state(id(request))
159+
return None
160+
161+
162+
class _RetryableMultipartRequest(decorators.MethodAnnotation):
163+
def modify_request(self, request_builder: Any) -> None:
164+
retryable_template = _RetryableMultipartRequestTemplate()
165+
# Insert ahead of Uplink's retry template so this helper can see the
166+
# original retry-triggering response/exception and short-circuit future
167+
# attempts when a multipart stream cannot be rewound safely.
168+
request_builder._request_templates.insert(0, retryable_template)
169+
request_builder._request_templates.append(
170+
_RetryableMultipartCleanupTemplate(retryable_template)
171+
)
172+
173+
174+
def retryable_multipart_request() -> Callable[[F], F]:
175+
"""Create a decorator for multipart requests with retry-safe stream handling.
176+
177+
Behavior:
178+
- The initial send preserves the caller-provided stream position.
179+
- Retry attempts rewind seekable multipart payloads back to offset 0.
180+
- Multipart fields that do not contain streams, such as simple strings, are
181+
left alone and do not block retries.
182+
- If a retry attempt cannot rewind a payload, the retry is cancelled and the
183+
original retry-triggering response or exception is surfaced.
184+
"""
185+
186+
def decorator(func: F) -> F:
187+
return _RetryableMultipartRequest()(func) # type: ignore[return-value]
188+
189+
return decorator

nisystemlink/clients/feeds/_feeds_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
from nisystemlink.clients import core
66
from nisystemlink.clients.core._uplink._base_client import BaseClient
77
from nisystemlink.clients.core._uplink._methods import delete, get, post
8+
from nisystemlink.clients.core._uplink._multipart_retry import (
9+
retryable_multipart_request,
10+
)
811
from uplink import Part, Path, Query, retry
912

1013
from . import models
@@ -90,6 +93,7 @@ def query_feeds(
9093

9194
return response
9295

96+
@retryable_multipart_request()
9397
@post(
9498
"feeds/{feedId}/packages",
9599
args=[Path(name="feedId"), Part(), Query(name="shouldOverwrite")],

nisystemlink/clients/file/_file_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
post,
1515
response_handler,
1616
)
17+
from nisystemlink.clients.core._uplink._multipart_retry import (
18+
retryable_multipart_request,
19+
)
1720
from nisystemlink.clients.core.helpers import IteratorFileLike
1821
from requests.models import Response
1922
from uplink import Body, Field, params, Part, Path, Query, retry
@@ -331,6 +334,7 @@ def download_file(self, id: str) -> IteratorFileLike:
331334
"""
332335

333336
@response_handler(_file_uri_response_handler)
337+
@retryable_multipart_request()
334338
@post("service-groups/Default/upload-files")
335339
def __upload_file(
336340
self,
@@ -433,6 +437,7 @@ def start_upload_session(
433437
],
434438
)
435439
@response_handler(lambda response: None)
440+
@retryable_multipart_request()
436441
def append_to_upload_session(
437442
self,
438443
session_id: str,

nisystemlink/clients/notebook/_notebook_client.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import io
2-
from typing import List
2+
from typing import BinaryIO, List
33

44
from nisystemlink.clients import core
55
from nisystemlink.clients.core._api_error import ApiError
@@ -14,6 +14,9 @@
1414
put,
1515
response_handler,
1616
)
17+
from nisystemlink.clients.core._uplink._multipart_retry import (
18+
retryable_multipart_request,
19+
)
1720
from nisystemlink.clients.core.helpers._iterator_file_like import IteratorFileLike
1821
from uplink import Part, Path, retry
1922

@@ -54,12 +57,16 @@ def get_notebook(self, id: str) -> models.NotebookMetadata:
5457
"""
5558
...
5659

57-
@put("ninotebook/v1/notebook/{id}")
60+
@retryable_multipart_request()
61+
@put(
62+
"ninotebook/v1/notebook/{id}",
63+
args=[Path("id"), Part("metadata"), Part("content")],
64+
)
5865
def __update_notebook(
5966
self,
60-
id: Path,
61-
metadata: Part = None,
62-
content: Part = None,
67+
id: str,
68+
metadata: io.BytesIO | None = None,
69+
content: BinaryIO | None = None,
6370
) -> models.NotebookMetadata:
6471
"""Updates a notebook metadata by ID.
6572
@@ -81,7 +88,7 @@ def update_notebook(
8188
self,
8289
id: str,
8390
metadata: models.NotebookMetadata | None = None,
84-
content: io.BufferedReader | None = None,
91+
content: BinaryIO | None = None,
8592
) -> models.NotebookMetadata:
8693
"""Updates a notebook metadata by ID.
8794
@@ -121,11 +128,15 @@ def delete_notebook(self, id: str) -> None:
121128
"""
122129
...
123130

124-
@post("ninotebook/v1/notebook")
131+
@retryable_multipart_request()
132+
@post(
133+
"ninotebook/v1/notebook",
134+
args=[Part("metadata"), Part("content")],
135+
)
125136
def __create_notebook(
126137
self,
127-
metadata: Part,
128-
content: Part,
138+
metadata: io.BytesIO,
139+
content: BinaryIO,
129140
) -> models.NotebookMetadata:
130141
"""Creates a new notebook.
131142
@@ -145,7 +156,7 @@ def __create_notebook(
145156
def create_notebook(
146157
self,
147158
metadata: models.NotebookMetadata,
148-
content: io.BufferedReader,
159+
content: BinaryIO,
149160
) -> models.NotebookMetadata:
150161
"""Creates a new notebook.
151162

0 commit comments

Comments
 (0)