Skip to content

Commit 7f19a60

Browse files
authored
Merge pull request #9 from matdev83/codex/fix-stream-race-ci
Fix race detector CI failures
2 parents c307d5f + c1a94d3 commit 7f19a60

10 files changed

Lines changed: 51 additions & 14 deletions

File tree

.github/workflows/qa.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
name: QA
22

33
on:
4-
push:
54
pull_request:
65

76
jobs:

.golangci.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,3 @@ linters:
3535
formatters:
3636
enable:
3737
- gofumpt
38-
39-
issues:
40-
exclude-use-default: false

internal/core/auth/events.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package auth
22

33
import (
44
"context"
5+
"sync"
56

67
sdkauth "github.com/matdev83/go-llm-interactive-proxy/pkg/lipsdk/auth"
78
)
@@ -12,6 +13,7 @@ import (
1213
// Event DTO additions remain subject to non-secret classification; see [EventSink] and
1314
// [sdkauth.AuthDecisionEvent] / [sdkauth.SessionStartEvent] package docs.
1415
type EventDispatcher struct {
16+
mu sync.Mutex
1517
sink EventSink
1618
policy EventFailurePolicy
1719
}
@@ -29,6 +31,8 @@ func (d *EventDispatcher) DispatchAuthDecision(ctx context.Context, ev sdkauth.A
2931
}
3032
ev2 := ev
3133
ev2.ChallengeSummary = sdkauth.SanitizePublicChallengeSummary(ev.ChallengeSummary, "", sdkauth.PublicChallengeSummaryMaxRunes)
34+
d.mu.Lock()
35+
defer d.mu.Unlock()
3236
err := d.sink.OnAuthDecision(ctx, ev2)
3337
return d.handleSinkError(err)
3438
}
@@ -38,6 +42,8 @@ func (d *EventDispatcher) DispatchSessionStart(ctx context.Context, ev sdkauth.S
3842
if d == nil || d.sink == nil {
3943
return nil
4044
}
45+
d.mu.Lock()
46+
defer d.mu.Unlock()
4147
err := d.sink.OnSessionStart(ctx, ev)
4248
return d.handleSinkError(err)
4349
}

internal/core/auth/events_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package auth
33
import (
44
"context"
55
"errors"
6+
"sync"
67
"testing"
78
"time"
89

@@ -98,6 +99,30 @@ func TestEventDispatcher_failClosed_propagatesSinkError(t *testing.T) {
9899
}
99100
}
100101

102+
func TestEventDispatcher_serializesConcurrentSinkDelivery(t *testing.T) {
103+
t.Parallel()
104+
const calls = 64
105+
sink := &fakeSink{}
106+
d := NewEventDispatcher(sink, EventFailureBestEffort)
107+
ctx := context.Background()
108+
109+
var wg sync.WaitGroup
110+
wg.Add(calls)
111+
for range calls {
112+
go func() {
113+
defer wg.Done()
114+
if err := d.DispatchAuthDecision(ctx, sampleAuthEvent()); err != nil {
115+
t.Errorf("DispatchAuthDecision: %v", err)
116+
}
117+
}()
118+
}
119+
wg.Wait()
120+
121+
if sink.authCalls != calls {
122+
t.Fatalf("auth calls: got %d want %d", sink.authCalls, calls)
123+
}
124+
}
125+
101126
func sampleAuthEvent() sdkauth.AuthDecisionEvent {
102127
return sdkauth.AuthDecisionEvent{
103128
Time: time.Unix(1700000000, 0).UTC(),

internal/core/auth/ports.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ const (
4545
)
4646

4747
// EventSink receives non-secret auth and session events. Implementations are wired at the
48-
// composition root (e.g. structured logging). OnAuthDecision: do not treat [sdkauth.AuthDecisionEvent]
48+
// composition root (e.g. structured logging). [EventDispatcher] serializes calls per dispatcher
49+
// instance; sinks should still avoid long blocking work because delivery remains on request paths.
50+
// OnAuthDecision: do not treat [sdkauth.AuthDecisionEvent]
4951
// fields as proof of absence of secrets in upstream state; log only stable, operator-approved
5052
// attributes (the default JSON sink logs [sdkauth.AuthDecisionEvent.PrincipalSafeClaims] keys only,
5153
// not map values). New fields on event DTOs require explicit non-secret data classification before use;

internal/core/modelcatalog/eligibility.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
type EligibilityDecision struct {
1212
IsEligible bool `json:"eligible"`
1313
Reason EligibilityReason `json:"reason,omitempty"`
14-
Facts EffectiveFacts `json:"facts,omitempty"`
15-
Estimate SizeEstimate `json:"estimate,omitempty"`
14+
Facts EffectiveFacts `json:"facts,omitzero"`
15+
Estimate SizeEstimate `json:"estimate,omitzero"`
1616
}
1717

1818
// EligibilityResolverImpl decides context-limit eligibility from already-resolved [EffectiveFacts]

internal/core/runtime/executor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ type Executor struct {
7777
ExtensionMetrics extensions.StageMetrics
7878
// CompletionBufferLimits overrides completion-gate buffering bounds (tests). Zero MaxEvents uses SDK defaults.
7979
CompletionBufferLimits completion.BufferLimits
80+
// secureSessionMu guards lazy initialization of SecureSession in the test hook path.
81+
secureSessionMu sync.Mutex
8082

8183
// SecureSession authorizes turns via BeginTurn before submit hooks; required for all executor prepares.
8284
SecureSession *app.Manager
@@ -146,6 +148,8 @@ func (e *Executor) Execute(ctx context.Context, call *lipapi.Call) (_ lipapi.Eve
146148
if e.RuntimeSnapshot != nil {
147149
ctx = extensions.WithRequestRuntimeSnapshot(ctx, e.RuntimeSnapshot)
148150
}
151+
e.secureSessionMu.Lock()
152+
defer e.secureSessionMu.Unlock()
149153
if e.SecureSession == nil {
150154
secureSessionTestPrepare(e)
151155
}

internal/core/stream/recv_context_contract_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type sdkLikeRecvBlocker struct {
1616
mu sync.Mutex
1717
ch chan struct{}
1818
recvStarted chan struct{}
19+
startedOnce sync.Once
1920
}
2021

2122
func newSDKLikeRecvBlocker() *sdkLikeRecvBlocker {
@@ -29,11 +30,14 @@ func (s *sdkLikeRecvBlocker) Recv(ctx context.Context) (lipapi.Event, error) {
2930
if ctx == nil {
3031
return lipapi.Event{}, lipapi.ErrNilContext
3132
}
32-
select {
33-
case s.recvStarted <- struct{}{}:
34-
default:
33+
s.startedOnce.Do(func() { close(s.recvStarted) })
34+
s.mu.Lock()
35+
ch := s.ch
36+
s.mu.Unlock()
37+
if ch == nil {
38+
return lipapi.Event{}, context.Canceled
3539
}
36-
<-s.ch
40+
<-ch
3741
return lipapi.Event{}, context.Canceled
3842
}
3943

internal/stdhttp/modelcatalog_diag_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestModelCatalogDiagnostics_mount_stackHTTPHandler_wiring(t *testing.T) {
158158
}
159159
built := &runtimebundle.Built{CatalogRuntime: rt}
160160
mux := http.NewServeMux()
161-
mountModelCatalogDiagnostics(mux, cfg, testkit.DiscardLogger(), built, context.Background())
161+
mountModelCatalogDiagnostics(context.Background(), mux, cfg, testkit.DiscardLogger(), built)
162162
outer := stackHTTPHandler(stackHTTPInput{
163163
Cfg: cfg, Log: testkit.DiscardLogger(), Built: built, TraceGen: diag.NewTraceIDGenerator(), Inner: mux, HTTPProm: nil,
164164
})

internal/stdhttp/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func RunWithRuntime(
235235
mux.Handle("GET "+base, dh)
236236
log.InfoContext(logCtx, "secure-session diagnostics mounted", "path", base)
237237
}
238-
mountModelCatalogDiagnostics(mux, cfg, log, built, logCtx)
238+
mountModelCatalogDiagnostics(logCtx, mux, cfg, log, built)
239239
maxBody := cfg.Server.EffectiveMaxRequestBodyBytes()
240240
var trafficPorts traffic.PortBundle
241241
if built.RuntimeSnapshot != nil {
@@ -327,7 +327,7 @@ func RunWithRuntime(
327327
}
328328
}
329329

330-
func mountModelCatalogDiagnostics(mux *http.ServeMux, cfg *config.Config, log *slog.Logger, built *runtimebundle.Built, logCtx context.Context) {
330+
func mountModelCatalogDiagnostics(logCtx context.Context, mux *http.ServeMux, cfg *config.Config, log *slog.Logger, built *runtimebundle.Built) {
331331
if mux == nil || cfg == nil {
332332
return
333333
}

0 commit comments

Comments
 (0)