Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/qa.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: QA

on:
push:
pull_request:

jobs:
Expand Down
3 changes: 0 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,3 @@ linters:
formatters:
enable:
- gofumpt

issues:
exclude-use-default: false
6 changes: 6 additions & 0 deletions internal/core/auth/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package auth

import (
"context"
"sync"

sdkauth "github.com/matdev83/go-llm-interactive-proxy/pkg/lipsdk/auth"
)
Expand All @@ -12,6 +13,7 @@ import (
// Event DTO additions remain subject to non-secret classification; see [EventSink] and
// [sdkauth.AuthDecisionEvent] / [sdkauth.SessionStartEvent] package docs.
type EventDispatcher struct {
mu sync.Mutex
sink EventSink
policy EventFailurePolicy
}
Expand All @@ -29,6 +31,8 @@ func (d *EventDispatcher) DispatchAuthDecision(ctx context.Context, ev sdkauth.A
}
ev2 := ev
ev2.ChallengeSummary = sdkauth.SanitizePublicChallengeSummary(ev.ChallengeSummary, "", sdkauth.PublicChallengeSummaryMaxRunes)
d.mu.Lock()
defer d.mu.Unlock()
err := d.sink.OnAuthDecision(ctx, ev2)
return d.handleSinkError(err)
}
Expand All @@ -38,6 +42,8 @@ func (d *EventDispatcher) DispatchSessionStart(ctx context.Context, ev sdkauth.S
if d == nil || d.sink == nil {
return nil
}
d.mu.Lock()
defer d.mu.Unlock()
err := d.sink.OnSessionStart(ctx, ev)
return d.handleSinkError(err)
}
Expand Down
25 changes: 25 additions & 0 deletions internal/core/auth/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package auth
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -98,6 +99,30 @@ func TestEventDispatcher_failClosed_propagatesSinkError(t *testing.T) {
}
}

func TestEventDispatcher_serializesConcurrentSinkDelivery(t *testing.T) {
t.Parallel()
const calls = 64
sink := &fakeSink{}
d := NewEventDispatcher(sink, EventFailureBestEffort)
ctx := context.Background()

var wg sync.WaitGroup
wg.Add(calls)
for range calls {
go func() {
defer wg.Done()
if err := d.DispatchAuthDecision(ctx, sampleAuthEvent()); err != nil {
t.Errorf("DispatchAuthDecision: %v", err)
}
}()
}
wg.Wait()

if sink.authCalls != calls {
t.Fatalf("auth calls: got %d want %d", sink.authCalls, calls)
}
}

func sampleAuthEvent() sdkauth.AuthDecisionEvent {
return sdkauth.AuthDecisionEvent{
Time: time.Unix(1700000000, 0).UTC(),
Expand Down
4 changes: 3 additions & 1 deletion internal/core/auth/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ const (
)

// EventSink receives non-secret auth and session events. Implementations are wired at the
// composition root (e.g. structured logging). OnAuthDecision: do not treat [sdkauth.AuthDecisionEvent]
// composition root (e.g. structured logging). [EventDispatcher] serializes calls per dispatcher
// instance; sinks should still avoid long blocking work because delivery remains on request paths.
// OnAuthDecision: do not treat [sdkauth.AuthDecisionEvent]
// fields as proof of absence of secrets in upstream state; log only stable, operator-approved
// attributes (the default JSON sink logs [sdkauth.AuthDecisionEvent.PrincipalSafeClaims] keys only,
// not map values). New fields on event DTOs require explicit non-secret data classification before use;
Expand Down
4 changes: 2 additions & 2 deletions internal/core/modelcatalog/eligibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type EligibilityDecision struct {
IsEligible bool `json:"eligible"`
Reason EligibilityReason `json:"reason,omitempty"`
Facts EffectiveFacts `json:"facts,omitempty"`
Estimate SizeEstimate `json:"estimate,omitempty"`
Facts EffectiveFacts `json:"facts,omitzero"`
Estimate SizeEstimate `json:"estimate,omitzero"`
}

// EligibilityResolverImpl decides context-limit eligibility from already-resolved [EffectiveFacts]
Expand Down
5 changes: 5 additions & 0 deletions internal/core/runtime/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var _ lipsdk.ExecutorView = (*Executor)(nil)
// internal/core/runtime test binary links export_test.go, which assigns this hook in init.
var secureSessionTestPrepare = func(*Executor) {}

// secureSessionMu guards lazy initialization of Executor.SecureSession in tests.
var secureSessionMu sync.Mutex
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

// Executor orchestrates hooks, capability negotiation, routing, B2BUA, and backend attempts.
type Executor struct {
Store b2bua.Store
Expand Down Expand Up @@ -146,9 +149,11 @@ func (e *Executor) Execute(ctx context.Context, call *lipapi.Call) (_ lipapi.Eve
if e.RuntimeSnapshot != nil {
ctx = extensions.WithRequestRuntimeSnapshot(ctx, e.RuntimeSnapshot)
}
secureSessionMu.Lock()
if e.SecureSession == nil {
secureSessionTestPrepare(e)
}
secureSessionMu.Unlock()
if e.SecureSession == nil {
return nil, fmt.Errorf("executor: secure session manager is required")
}
Expand Down
12 changes: 8 additions & 4 deletions internal/core/stream/recv_context_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type sdkLikeRecvBlocker struct {
mu sync.Mutex
ch chan struct{}
recvStarted chan struct{}
startedOnce sync.Once
}

func newSDKLikeRecvBlocker() *sdkLikeRecvBlocker {
Expand All @@ -29,11 +30,14 @@ func (s *sdkLikeRecvBlocker) Recv(ctx context.Context) (lipapi.Event, error) {
if ctx == nil {
return lipapi.Event{}, lipapi.ErrNilContext
}
select {
case s.recvStarted <- struct{}{}:
default:
s.startedOnce.Do(func() { close(s.recvStarted) })
s.mu.Lock()
ch := s.ch
s.mu.Unlock()
if ch == nil {
return lipapi.Event{}, context.Canceled
}
<-s.ch
<-ch
return lipapi.Event{}, context.Canceled
}

Expand Down
2 changes: 1 addition & 1 deletion internal/stdhttp/modelcatalog_diag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestModelCatalogDiagnostics_mount_stackHTTPHandler_wiring(t *testing.T) {
}
built := &runtimebundle.Built{CatalogRuntime: rt}
mux := http.NewServeMux()
mountModelCatalogDiagnostics(mux, cfg, testkit.DiscardLogger(), built, context.Background())
mountModelCatalogDiagnostics(context.Background(), mux, cfg, testkit.DiscardLogger(), built)
outer := stackHTTPHandler(stackHTTPInput{
Cfg: cfg, Log: testkit.DiscardLogger(), Built: built, TraceGen: diag.NewTraceIDGenerator(), Inner: mux, HTTPProm: nil,
})
Expand Down
4 changes: 2 additions & 2 deletions internal/stdhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func RunWithRuntime(
mux.Handle("GET "+base, dh)
log.InfoContext(logCtx, "secure-session diagnostics mounted", "path", base)
}
mountModelCatalogDiagnostics(mux, cfg, log, built, logCtx)
mountModelCatalogDiagnostics(logCtx, mux, cfg, log, built)
maxBody := cfg.Server.EffectiveMaxRequestBodyBytes()
var trafficPorts traffic.PortBundle
if built.RuntimeSnapshot != nil {
Expand Down Expand Up @@ -327,7 +327,7 @@ func RunWithRuntime(
}
}

func mountModelCatalogDiagnostics(mux *http.ServeMux, cfg *config.Config, log *slog.Logger, built *runtimebundle.Built, logCtx context.Context) {
func mountModelCatalogDiagnostics(logCtx context.Context, mux *http.ServeMux, cfg *config.Config, log *slog.Logger, built *runtimebundle.Built) {
if mux == nil || cfg == nil {
return
}
Expand Down