-
Notifications
You must be signed in to change notification settings - Fork 47
Refactor SSE client with timeout support and listen_until_clear() #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: server
Are you sure you want to change the base?
Changes from 3 commits
3d69daf
9d86a8a
cf640e1
4094abf
3c77cca
de45573
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| from datetime import datetime | ||
| from typing import Optional | ||
| import os | ||
| import json | ||
|
|
@@ -87,33 +88,57 @@ def block(self, delay=0.005, base=2, max_attempts=16): | |
| raise StopIteration | ||
| return meta | ||
|
|
||
| def listen(self): | ||
| """ | ||
| Listens to server side events on the status of a request. | ||
| def listen(self, timeout: Optional[float] = 300): | ||
| """ | ||
| Listens to server-side events on the status of a request. | ||
|
|
||
| Yields: | ||
| dict[str, Any]: Parsed JSON messages from the event stream. | ||
| """ | ||
| url = self.server.base + f"/status_stream/{quote(self.status_loc)}" | ||
| with EventSource(url, timeout=timeout) as event_source: | ||
| for event in event_source: | ||
| if event.data: | ||
| msg = json.loads(event.data) | ||
| yield msg | ||
|
|
||
| def on_error(): | ||
| raise Exception("error") | ||
| print("event stream closed") | ||
|
|
||
| with EventSource( | ||
| url, | ||
| timeout=30, | ||
| on_error=on_error, | ||
| ) as event_source: | ||
| try: | ||
| print("listening...") | ||
| for event in event_source: | ||
| if event.data: | ||
| msg = json.loads(event.data) | ||
| print("msg: ", msg) | ||
| if msg['status'] == "pathClear": | ||
| return | ||
| def listen_until_clear(self, timeout: Optional[float] = 300): | ||
| """ | ||
| Listens to server side events until a 'pathClear' status is received. | ||
|
|
||
| except Exception as e: | ||
| print("error: ", e) | ||
| Args: | ||
| timeout (float): The maximum amount of time to listen. | ||
|
|
||
| Returns: | ||
| metadata: Any | ||
|
|
||
| Raises: | ||
| TimeoutError: If the timeout is reached before the path is clear. | ||
| """ | ||
| print("Listening status ...") | ||
| start_time = monotonic() | ||
|
|
||
| try: | ||
| for msg in self.listen(timeout=timeout): | ||
| if timeout and (monotonic() - start_time) > timeout: | ||
| raise TimeoutError(f"listen_until_clear timed out after {timeout} seconds") | ||
|
|
||
| status = msg.get("status") | ||
| if status == "pathClear": | ||
| print("path cleared") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above about status chatter |
||
| return "" | ||
| elif status == "pathForbiddenTemporary": | ||
| print("waiting for path to clear") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above about status chatter |
||
| continue | ||
| else: | ||
| return msg | ||
| except requests.exceptions.ReadTimeout: # timeout raised by requests module | ||
| raise TimeoutError(f"Timeout of {timeout}s waiting for event from server.") from None | ||
| except Exception as e: # unkown exceptions | ||
| print(f"An unexpected error occurred: {e}") | ||
| raise | ||
|
|
||
|
|
||
| def __str__(self): | ||
|
|
@@ -433,7 +458,7 @@ def __enter__(self): | |
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| if "time" in self.finalization: print(f"{self.ns.format("*")} time {monotonic() - self.t0:.6f} s") | ||
| if "clear" in self.finalization: self.clear().listen() | ||
| if "clear" in self.finalization: self.clear().listen_until_clear() | ||
| if "spin_down" in self.finalization: self.spin_down() | ||
| if "stop" in self.finalization: self.stop() | ||
|
|
||
|
|
@@ -595,7 +620,7 @@ def _main(): | |
|
|
||
| print("data", ins.download_().data) | ||
|
|
||
| ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen() | ||
| ins.sexpr_import_("https://raw.githubusercontent.com/trueagi-io/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen_until_clear() | ||
|
|
||
| print("data", ins.download_().data) | ||
|
|
||
|
|
@@ -607,23 +632,32 @@ def _main_mm2(): | |
| # smoke test | ||
| with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: | ||
| server.upload_("(data (foo 1))\n(data (foo 2))\n(_exec 0 (, (data (foo $x))) (, (data (bar $x))))") | ||
| server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen() | ||
| server.exec(thread_id="test").listen() | ||
| print("data", server.download_().data) | ||
|
|
||
| for i, item in enumerate(server.history): | ||
| print(i, str(item)) | ||
| server.transform(("(_exec $priority $p $t)",), ("(exec (test $priority) $p $t)",)).listen_until_clear(5) | ||
| server.exec(thread_id="test").listen_until_clear(5) | ||
| # print("data", server.download_().data) | ||
| # | ||
| # for i, item in enumerate(server.history): | ||
| # print(i, str(item)) | ||
|
|
||
| def test_sse_status(): | ||
| with ManagedMORK.connect("../target/debug/mork_server").and_log_stdout().and_log_stderr().and_terminate() as server: | ||
| server.sexpr_import_(f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/simpsons.metta").listen() | ||
| DATASETS = ( | ||
| "royal92", | ||
| "lordOfTheRings", | ||
| "adameve", | ||
| "simpsons", | ||
| ) | ||
| for dataset in DATASETS: | ||
| server.sexpr_import_( | ||
| f"https://raw.githubusercontent.com/Adam-Vandervorst/metta-examples/refs/heads/main/aunt-kg/{dataset}.metta" | ||
| ).listen_until_clear() | ||
|
|
||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| # _main() | ||
| _main_mm2() | ||
| # test_sse_status() | ||
| # _main_mm2() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's restore the functionality of the Perhaps we want to move all tests outside of |
||
| test_sse_status() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if we keep this test in this file, it probably ought to start with '_' so it doesn't look like a public API.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i will resolve the above comments |
||
|
|
||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to be this chatty, by default. It's handy for debugging, and maybe if we support a "verbose" switch. But otherwise we probably want operations at this level to be silent on the client side. Just my opinion.