Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/qa.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: QA

on:
push:
pull_request:

jobs:
Expand Down
6 changes: 6 additions & 0 deletions internal/core/auth/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package auth

import (
"context"
"sync"

sdkauth "github.com/matdev83/go-llm-interactive-proxy/pkg/lipsdk/auth"
)
Expand All @@ -12,6 +13,7 @@ import (
// Event DTO additions remain subject to non-secret classification; see [EventSink] and
// [sdkauth.AuthDecisionEvent] / [sdkauth.SessionStartEvent] package docs.
type EventDispatcher struct {
mu sync.Mutex
sink EventSink
policy EventFailurePolicy
}
Expand All @@ -29,6 +31,8 @@ func (d *EventDispatcher) DispatchAuthDecision(ctx context.Context, ev sdkauth.A
}
ev2 := ev
ev2.ChallengeSummary = sdkauth.SanitizePublicChallengeSummary(ev.ChallengeSummary, "", sdkauth.PublicChallengeSummaryMaxRunes)
d.mu.Lock()
defer d.mu.Unlock()
err := d.sink.OnAuthDecision(ctx, ev2)
return d.handleSinkError(err)
}
Expand All @@ -38,6 +42,8 @@ func (d *EventDispatcher) DispatchSessionStart(ctx context.Context, ev sdkauth.S
if d == nil || d.sink == nil {
return nil
}
d.mu.Lock()
defer d.mu.Unlock()
err := d.sink.OnSessionStart(ctx, ev)
return d.handleSinkError(err)
}
Expand Down
25 changes: 25 additions & 0 deletions internal/core/auth/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package auth
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -98,6 +99,30 @@ func TestEventDispatcher_failClosed_propagatesSinkError(t *testing.T) {
}
}

func TestEventDispatcher_serializesConcurrentSinkDelivery(t *testing.T) {
t.Parallel()
const calls = 64
sink := &fakeSink{}
d := NewEventDispatcher(sink, EventFailureBestEffort)
ctx := context.Background()

var wg sync.WaitGroup
wg.Add(calls)
for i := 0; i < calls; i++ {
go func() {
defer wg.Done()
if err := d.DispatchAuthDecision(ctx, sampleAuthEvent()); err != nil {
t.Errorf("DispatchAuthDecision: %v", err)
}
}()
}
wg.Wait()

if sink.authCalls != calls {
t.Fatalf("auth calls: got %d want %d", sink.authCalls, calls)
}
}

func sampleAuthEvent() sdkauth.AuthDecisionEvent {
return sdkauth.AuthDecisionEvent{
Time: time.Unix(1700000000, 0).UTC(),
Expand Down
4 changes: 3 additions & 1 deletion internal/core/auth/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ const (
)

// EventSink receives non-secret auth and session events. Implementations are wired at the
// composition root (e.g. structured logging). OnAuthDecision: do not treat [sdkauth.AuthDecisionEvent]
// composition root (e.g. structured logging). [EventDispatcher] serializes calls per dispatcher
// instance; sinks should still avoid long blocking work because delivery remains on request paths.
// OnAuthDecision: do not treat [sdkauth.AuthDecisionEvent]
// fields as proof of absence of secrets in upstream state; log only stable, operator-approved
// attributes (the default JSON sink logs [sdkauth.AuthDecisionEvent.PrincipalSafeClaims] keys only,
// not map values). New fields on event DTOs require explicit non-secret data classification before use;
Expand Down
5 changes: 5 additions & 0 deletions internal/core/runtime/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var _ lipsdk.ExecutorView = (*Executor)(nil)
// internal/core/runtime test binary links export_test.go, which assigns this hook in init.
var secureSessionTestPrepare = func(*Executor) {}

// secureSessionMu guards lazy initialization of Executor.SecureSession in tests.
var secureSessionMu sync.Mutex

// Executor orchestrates hooks, capability negotiation, routing, B2BUA, and backend attempts.
type Executor struct {
Store b2bua.Store
Expand Down Expand Up @@ -146,9 +149,11 @@ func (e *Executor) Execute(ctx context.Context, call *lipapi.Call) (_ lipapi.Eve
if e.RuntimeSnapshot != nil {
ctx = extensions.WithRequestRuntimeSnapshot(ctx, e.RuntimeSnapshot)
}
secureSessionMu.Lock()
if e.SecureSession == nil {
secureSessionTestPrepare(e)
}
secureSessionMu.Unlock()
if e.SecureSession == nil {
return nil, fmt.Errorf("executor: secure session manager is required")
}
Expand Down
Loading