Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .copier-answers.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Changes here will be overwritten by Copier
_commit: 222d96b
_commit: 4d4d95a
_src_path: https://github.com/python-project-templates/base.git
add_docs: false
add_extension: js
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
- name: Make dist
run: make dist

- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: dist
path: dist
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
run: make develop-js

- name: Download wheels
uses: actions/download-artifact@v7
uses: actions/download-artifact@v8
with:
name: dist

Expand All @@ -98,7 +98,7 @@ jobs:
run: make coverage

- name: Upload test results (Python)
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: test-results-${{ matrix.os }}-${{ matrix.python-version }}
path: '**/junit.xml'
Expand Down Expand Up @@ -132,7 +132,7 @@ jobs:
version: 3.11

- name: Download wheels
uses: actions/download-artifact@v7
uses: actions/download-artifact@v8
with:
name: dist

Expand Down
57 changes: 57 additions & 0 deletions csp_gateway/testing/mock_validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
def mock_api_key_validator_valid(api_key: str, settings, module) -> dict:
"""A mock validator that accepts specific API keys and returns an identity dict."""
valid_keys = {
"valid_key_1": {"user": "alice", "role": "admin"},
"valid_key_2": {"user": "bob", "role": "viewer"},
}
return valid_keys.get(api_key)


def mock_api_key_validator_invalid(api_key: str, settings, module) -> dict:
"""A mock validator that always returns None (invalid key)."""
return None


def mock_api_key_validator_raises(api_key: str, settings, module) -> dict:
"""A mock validator that raises an exception."""
raise ValueError("External validation service error")


def mock_api_key_validator_admin(api_key: str, settings, module) -> dict:
"""A mock validator that only accepts admin_key."""
if api_key == "admin_key":
return {"user": "admin", "role": "superadmin"}
return None


def mock_api_key_validator_by_user(api_key: str, settings, module) -> dict:
"""A mock validator that maps user-specific API keys to identities."""
users = {
"alice_key": {"user": "alice", "role": "admin"},
"bob_key": {"user": "bob", "role": "viewer"},
"charlie_key": {"user": "charlie", "role": "viewer"},
}
return users.get(api_key)


def mock_simple_auth_validator_valid(username: str, password: str, settings, module) -> dict:
"""A mock validator that accepts specific credentials."""
valid_users = {
("alice", "alicepass"): {"user": "alice", "role": "admin"},
("bob", "bobpass"): {"user": "bob", "role": "viewer"},
}
return valid_users.get((username, password))


def mock_simple_auth_validator_invalid(username: str, password: str, settings, module) -> dict:
"""A mock validator that always returns None (invalid credentials)."""
return None


def mock_simple_auth_validator_raises(username: str, password: str, settings, module) -> dict:
"""A mock validator that raises an exception."""
raise ValueError("External validation service error")


# Non-callable constant for testing that external_validator must be callable
NON_CALLABLE = "I am not callable"
60 changes: 15 additions & 45 deletions csp_gateway/tests/server/web/test_api_key_external.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Tests for MountExternalAPIKeyMiddleware."""

import pytest
from ccflow import PyObjectPath
from fastapi.testclient import TestClient
from pydantic import ValidationError

Expand All @@ -16,22 +15,11 @@
)
from csp_gateway.server.middleware.api_key import MountAPIKeyMiddleware
from csp_gateway.server.middleware.api_key_external import MountExternalAPIKeyMiddleware


def mock_validator_valid(api_key: str, settings, module) -> dict:
"""A mock validator that accepts specific API keys and returns an identity dict."""
valid_keys = {"valid_key_1": {"user": "alice", "role": "admin"}, "valid_key_2": {"user": "bob", "role": "viewer"}}
return valid_keys.get(api_key)


def mock_validator_invalid(api_key: str, settings, module) -> dict:
"""A mock validator that always returns None (invalid key)."""
return None


def mock_validator_raises(api_key: str, settings, module) -> dict:
"""A mock validator that raises an exception."""
raise ValueError("External validation service error")
from csp_gateway.testing.mock_validators import (
mock_api_key_validator_admin,
mock_api_key_validator_invalid,
mock_api_key_validator_valid,
)


class TestMountExternalAPIKeyMiddleware:
Expand All @@ -40,13 +28,11 @@ class TestMountExternalAPIKeyMiddleware:
@pytest.fixture(scope="class")
def external_key_gateway(self, free_port):
"""Create a gateway with external API key validation."""
# Use a PyObjectPath pointing to our mock validator
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_valid")
gateway = Gateway(
modules=[
ExampleModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(external_validator=validator_path),
MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_valid),
],
channels=ExampleGatewayChannels(),
settings=GatewaySettings(PORT=free_port),
Expand Down Expand Up @@ -90,12 +76,11 @@ class TestExternalAPIKeyLogin:
@pytest.fixture(scope="class")
def login_gateway(self, free_port):
"""Create a gateway for login testing."""
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_valid")
gateway = Gateway(
modules=[
ExampleModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(external_validator=validator_path),
MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_valid),
],
channels=ExampleGatewayChannels(),
settings=GatewaySettings(PORT=free_port),
Expand Down Expand Up @@ -131,8 +116,7 @@ class TestExternalAPIKeyIdentityStore:

def test_identity_stored_on_login(self):
"""Test that identity is stored in _identity_store after validation."""
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_valid")
middleware = MountExternalAPIKeyMiddleware(external_validator=validator_path)
middleware = MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_valid)

# Clear any existing identity store
middleware._identity_store = {}
Expand All @@ -143,8 +127,7 @@ def test_identity_stored_on_login(self):

def test_invalid_key_returns_none(self):
"""Test that invalid key returns None from validator."""
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_invalid")
middleware = MountExternalAPIKeyMiddleware(external_validator=validator_path)
middleware = MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_invalid)

identity = middleware._invoke_external("any_key", None, None)
assert identity is None
Expand All @@ -155,38 +138,33 @@ class TestExternalValidatorConfiguration:

def test_invalid_python_path_raises(self):
"""Test that invalid python path raises ValidationError."""
with pytest.raises(ValidationError, match="Invalid python path"):
with pytest.raises(ValidationError, match="ensure this value contains valid import path or importable object|Invalid python path"):
MountExternalAPIKeyMiddleware(external_validator="not_a_valid_path")

def test_validator_must_be_callable(self):
"""Test that external_validator must point to a callable object."""
# This path points to a non-callable (a string constant)
with pytest.raises(ValueError, match="external_validator must point to a callable object"):
MountExternalAPIKeyMiddleware(external_validator=PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:NON_CALLABLE"))
MountExternalAPIKeyMiddleware(external_validator="csp_gateway.testing.mock_validators.NON_CALLABLE")

def test_none_validator_allowed(self):
"""Test that None is allowed for external_validator."""
middleware = MountExternalAPIKeyMiddleware(external_validator=None)
assert middleware.external_validator is None


# Non-callable constant for testing
NON_CALLABLE = "I am not callable"


class TestScopeMatching:
"""Test scope-based authentication filtering."""

@pytest.fixture(scope="class")
def scoped_gateway(self, free_port):
"""Create a gateway with scoped API key validation (only /api/*)."""
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_valid")
gateway = Gateway(
modules=[
ExampleModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(
external_validator=validator_path,
external_validator=mock_api_key_validator_valid,
scope="/api/*",
),
],
Expand Down Expand Up @@ -225,8 +203,8 @@ class TestMultipleScopedMiddlewares:
@pytest.fixture(scope="class")
def multi_scope_gateway(self, free_port):
"""Create a gateway with two middlewares with different scopes and keys."""
validator_path_1 = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_valid")
validator_path_2 = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_admin")
validator_path_1 = mock_api_key_validator_valid
validator_path_2 = mock_api_key_validator_admin
gateway = Gateway(
modules=[
ExampleModule(),
Expand Down Expand Up @@ -274,13 +252,12 @@ class TestListScope:
@pytest.fixture(scope="class")
def list_scope_gateway(self, free_port):
"""Create a gateway with a list of scope patterns."""
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_api_key_external:mock_validator_valid")
gateway = Gateway(
modules=[
ExampleModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(
external_validator=validator_path,
external_validator=mock_api_key_validator_valid,
scope=["/api/v1/*", "/api/v2/*"],
),
],
Expand Down Expand Up @@ -353,13 +330,6 @@ def test_skip_if_out_of_scope(self):
assert middleware._skip_if_out_of_scope(request_out_of_scope) is True


def mock_validator_admin(api_key: str, settings, module) -> dict:
"""A mock validator that only accepts admin_key."""
if api_key == "admin_key":
return {"user": "admin", "role": "superadmin"}
return None


class TestMountAPIKeyMiddlewareScope:
"""Test MountAPIKeyMiddleware with scope configuration."""

Expand Down
24 changes: 5 additions & 19 deletions csp_gateway/tests/server/web/test_auth_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import csp
import pytest
from ccflow import PyObjectPath
from csp import ts
from fastapi.testclient import TestClient

Expand All @@ -19,6 +18,7 @@
)
from csp_gateway.server.middleware.api_key_external import MountExternalAPIKeyMiddleware
from csp_gateway.server.middleware.auth_filter import AuthFilterMiddleware
from csp_gateway.testing.mock_validators import mock_api_key_validator_by_user


# Test struct with a "user" field for filtering
Expand Down Expand Up @@ -49,16 +49,6 @@ def connect(self, channels: UserDataChannels):
channels.set_channel("user_data", data)


def mock_validator(api_key: str, settings, module) -> dict:
"""Mock validator that returns identity with user field."""
users = {
"alice_key": {"user": "alice", "role": "admin"},
"bob_key": {"user": "bob", "role": "viewer"},
"charlie_key": {"user": "charlie", "role": "viewer"},
}
return users.get(api_key)


class TestAuthFilterMiddleware:
"""Test AuthFilterMiddleware basic functionality."""

Expand Down Expand Up @@ -140,12 +130,11 @@ class TestAuthFilterMiddlewareIntegration:
@pytest.fixture(scope="class")
def filter_gateway(self, free_port):
"""Create a gateway with auth filtering enabled."""
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_auth_filter:mock_validator")
gateway = Gateway(
modules=[
UserDataModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(external_validator=validator_path),
MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_by_user),
AuthFilterMiddleware(filter_fields=["user"]),
],
channels=UserDataChannels(),
Expand Down Expand Up @@ -442,12 +431,11 @@ class TestIdentityCacheIntegration:
@pytest.fixture(scope="class")
def cached_gateway(self, free_port):
"""Create a gateway with identity cache enabled."""
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_auth_filter:mock_validator")
gateway = Gateway(
modules=[
UserDataModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(external_validator=validator_path),
MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_by_user),
AuthFilterMiddleware(
filter_fields=["user"],
identity_cache_channels=ChannelSelection(include=["user_data"]),
Expand Down Expand Up @@ -575,12 +563,11 @@ def free_port(self):

@pytest.fixture(scope="class")
def send_validated_gateway(self, free_port):
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_auth_filter:mock_validator")
gateway = Gateway(
modules=[
UserDataModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(external_validator=validator_path),
MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_by_user),
AuthFilterMiddleware(
filter_fields=["user"],
send_validation_channels=ChannelSelection(include=["user_data"]),
Expand Down Expand Up @@ -686,12 +673,11 @@ def free_port(self):

@pytest.fixture(scope="class")
def next_filtered_gateway(self, free_port):
validator_path = PyObjectPath("csp_gateway.tests.server.web.test_auth_filter:mock_validator")
gateway = Gateway(
modules=[
UserDataModule(),
MountRestRoutes(force_mount_all=True),
MountExternalAPIKeyMiddleware(external_validator=validator_path),
MountExternalAPIKeyMiddleware(external_validator=mock_api_key_validator_by_user),
AuthFilterMiddleware(
filter_fields=["user"],
next_filter_channels=ChannelSelection(include=["user_data"]),
Expand Down
Loading