Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions csp_gateway/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from copy import deepcopy
from enum import Enum
from functools import lru_cache
from importlib import import_module
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Tuple, Union, cast

Expand Down Expand Up @@ -305,9 +306,7 @@ def as_struct(self) -> Union[Any, List[Any]]:
# Import the class dynamically
try:
module_path, class_name = self.type_name.rsplit(".", 1)
import importlib

module = importlib.import_module(module_path)
module = import_module(module_path)
struct_class = getattr(module, class_name)
except (ValueError, ImportError, AttributeError) as e:
raise ValueError(f"Cannot import type '{self.type_name}': {e}") from e
Expand Down
36 changes: 1 addition & 35 deletions csp_gateway/client/csp_stream.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,4 @@
"""CSP integration for streaming data from the Gateway client over websockets.

This module provides the adapter infrastructure for streaming data from a GatewayServer
via websockets in CSP graphs. The main entry point is the `stream_csp` method on
`GatewayClient`.

Example:
>>> import csp
>>> from csp_gateway.client import GatewayClient
>>>
>>> @csp.graph
>>> def my_graph():
... client = GatewayClient(host="localhost", port=8000)
...
... # Subscribe to channels dynamically
... subscribe = csp.curve(str, [(timedelta(seconds=0), "channel1"), (timedelta(seconds=1), "channel2")])
... unsubscribe = csp.curve(str, [(timedelta(seconds=5), "channel1")])
...
... # Returns a DynamicBasket where each key is a channel name
... basket = client.stream_csp(subscribe=subscribe, unsubscribe=unsubscribe)
...
... # Access data for specific channels
... csp.print("basket", basket)
...
... # Bidirectional: send data back to the server
... outgoing = csp.const({"action": "send", "channel": "my_channel", "data": {"value": 42}})
... basket = client.stream_csp(subscribe=subscribe, data=outgoing)
"""

import asyncio
import logging
import threading
import time
Expand Down Expand Up @@ -234,8 +206,6 @@ def process_next_sim_timeslice(self, now):

def _run(self):
"""Main streaming thread."""
import asyncio

self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

Expand All @@ -250,8 +220,6 @@ def _run(self):

async def _connect_and_stream(self):
"""Async method to connect and stream data with retry logic."""
import asyncio

try:
from aiohttp import ClientConnectorError, ClientSession
except ImportError:
Expand Down Expand Up @@ -348,8 +316,6 @@ async def _receive_messages(self, ws):

async def _send_subscriptions(self, ws):
"""Send subscription/unsubscription requests and outgoing data from the pending queues."""
import asyncio

while self._running:
try:
# Check for pending subscribe requests
Expand Down
3 changes: 1 addition & 2 deletions csp_gateway/server/middleware/simple.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Simple Authentication Middleware."""

import base64
import logging
import platform
from datetime import timedelta
Expand Down Expand Up @@ -294,8 +295,6 @@ async def get_identity_from_credentials(
Returns:
Identity dict if valid credentials found, None otherwise.
"""
import base64

cookies = cookies or {}
headers = headers or {}

Expand Down
3 changes: 1 addition & 2 deletions csp_gateway/tests/client/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import inspect
import logging
from unittest.mock import patch

Expand Down Expand Up @@ -357,8 +358,6 @@ async def quick_generator():

def test_stream_method_signature_has_timeout():
"""Test that stream() method accepts timeout parameter."""
import inspect

# Check AsyncClient.stream signature
sig = inspect.signature(AsyncClient.stream)
params = list(sig.parameters.keys())
Expand Down
18 changes: 8 additions & 10 deletions csp_gateway/tests/client/test_csp_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for the CSP streaming client module."""

import inspect
import multiprocessing
import os
import signal
Expand Down Expand Up @@ -64,7 +65,6 @@ def test_create_stream_csp_graph():

# The wrapped function has the original parameters
assert hasattr(stream_csp, "__wrapped__")
import inspect

sig = inspect.signature(stream_csp.__wrapped__)
params = list(sig.parameters.keys())
Expand Down Expand Up @@ -111,8 +111,6 @@ def test_gateway_client_stream_csp_signature():
"""Test that stream_csp method has the expected signature."""
client = GatewayClient(host="localhost", port=8000)

import inspect

sig = inspect.signature(client.stream_csp)
params = list(sig.parameters.keys())
assert "subscribe" in params
Expand Down Expand Up @@ -412,15 +410,15 @@ def _run_gateway_for_csp_stream(port_str):

def _wait_for_server(url: str, timeout: int = 30):
"""Wait for the server to be ready."""
import requests
import httpx

start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(url, timeout=1)
resp = httpx.get(url, timeout=1, follow_redirects=True)
resp.raise_for_status()
return True
except (requests.HTTPError, requests.Timeout, requests.ConnectionError):
except (httpx.HTTPStatusError, httpx.TransportError):
time.sleep(0.5)
return False

Expand Down Expand Up @@ -483,10 +481,10 @@ def test_graph() -> csp.Outputs(received=csp.ts[dict]):

finally:
# Shutdown the gateway
import requests
import httpx

try:
requests.post(shutdown_url, timeout=1)
httpx.post(shutdown_url, timeout=1, follow_redirects=True)
except Exception:
pass
p.join(timeout=10)
Expand Down Expand Up @@ -559,10 +557,10 @@ def test_graph() -> csp.Outputs(received=csp.ts[dict]):

finally:
# Shutdown the gateway
import requests
import httpx

try:
requests.post(shutdown_url, timeout=1)
httpx.post(shutdown_url, timeout=1, follow_redirects=True)
except Exception:
pass
p.join(timeout=10)
Expand Down
12 changes: 6 additions & 6 deletions csp_gateway/tests/server/gateway/test_gateway_start_stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from unittest.mock import patch

import csp.impl.error_handling
import httpx
import pytest
import requests
from fastapi.testclient import TestClient

from csp_gateway import Gateway, GatewaySettings, MountControls, MountRestRoutes
Expand Down Expand Up @@ -179,10 +179,10 @@ def test_signal_with_shutdown(signal_val, free_port):
assert p.exitcode == 0
try:
time.sleep(REQUEST_RETRY_TIMEOUT)
resp = requests.get(url, timeout=1)
resp = httpx.get(url, timeout=1, follow_redirects=True)
resp.raise_for_status()
break
except (requests.HTTPError, requests.Timeout, requests.ConnectionError):
except (httpx.HTTPStatusError, httpx.TransportError):
pass

print("Server is up")
Expand Down Expand Up @@ -225,16 +225,16 @@ def test_shutdown_with_big_red_button(free_port):
assert p.exitcode == 0
try:
time.sleep(REQUEST_RETRY_TIMEOUT)
resp = requests.get(state_url, timeout=1)
resp = httpx.get(state_url, timeout=1, follow_redirects=True)
resp.raise_for_status()
break
except (requests.HTTPError, requests.Timeout, requests.ConnectionError):
except (httpx.HTTPStatusError, httpx.TransportError):
print("Server not up yet")
continue
print("Server is up")
# Send signal to invoke shutdown
print("Sending Shutdown Request")
resp = requests.post(shutdown_url, timeout=1)
resp = httpx.post(shutdown_url, timeout=1, follow_redirects=True)
# Wait for gateway to react to signal
p.join(AFTER_SHUTDOWN_WAIT_TIME)
# Check if gateway shutdown with proper exit status
Expand Down
11 changes: 3 additions & 8 deletions csp_gateway/tests/server/web/test_auth_filter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Tests for AuthFilterMiddleware."""

import json
import socket
import time
from datetime import timedelta

import csp
Expand Down Expand Up @@ -259,8 +262,6 @@ class MockWebSocket:
message = '{"channel": "test", "data": [{"user": "alice", "data": "visible"}, {"user": "bob", "data": "hidden"}]}'
filtered = await middleware.filter_websocket_data(message, ws)

import json

parsed = json.loads(filtered)
assert len(parsed["data"]) == 1
assert parsed["data"][0]["user"] == "alice"
Expand Down Expand Up @@ -463,8 +464,6 @@ def test_auth_required_with_cache(self, cached_client: TestClient):

def test_cached_last_returns_user_data(self, cached_client: TestClient):
"""Test that /last returns only data for the authenticated user."""
import time

# Wait for some data to be generated
time.sleep(0.5)

Expand Down Expand Up @@ -555,8 +554,6 @@ class TestSendValidationIntegration:

@pytest.fixture(scope="class")
def free_port(self):
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]
Expand Down Expand Up @@ -665,8 +662,6 @@ class TestNextFilteringIntegration:

@pytest.fixture(scope="class")
def free_port(self):
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]
Expand Down