Skip to content

Commit 715be7f

Browse files
AkihiroSudaclaude
andcommitted
logging: fix data race in logging binary that loses container output
The logging binary had a goroutine that closed internal pipe writers immediately upon container exit (via gRPC notification). This raced with copyStream, which was still draining data from the external pipe. When the gRPC notification arrived before copyStream finished, the pipe writer was closed mid-copy, causing remaining data to be lost. Fix by making copyStream responsible for closing the pipe writers after all data has been copied. The container-exit goroutine (and its getContainerWait infrastructure) is removed — pipe closure now happens naturally when the parent process exits or the shim closes the pipe. Fixes #4782 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7cd18d2 commit 715be7f

File tree

2 files changed

+7
-72
lines changed

2 files changed

+7
-72
lines changed

pkg/logging/logging.go

Lines changed: 6 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ import (
2626
"os"
2727
"path/filepath"
2828
"sort"
29-
"strings"
3029
"sync"
3130
"time"
3231

3332
"github.com/fsnotify/fsnotify"
3433
"github.com/muesli/cancelreader"
3534

36-
containerd "github.com/containerd/containerd/v2/client"
3735
"github.com/containerd/containerd/v2/core/runtime/v2/logging"
3836
"github.com/containerd/errdefs"
3937
"github.com/containerd/log"
@@ -165,49 +163,7 @@ func WaitForLogger(dataStore, ns, id string) error {
165163
})
166164
}
167165

168-
func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
169-
client, err := containerd.New(strings.TrimPrefix(address, "unix://"), containerd.WithDefaultNamespace(config.Namespace))
170-
if err != nil {
171-
return nil, err
172-
}
173-
con, err := client.LoadContainer(ctx, config.ID)
174-
if err != nil {
175-
return nil, err
176-
}
177-
178-
task, err := con.Task(ctx, nil)
179-
if err == nil {
180-
return task.Wait(ctx)
181-
}
182-
if !errdefs.IsNotFound(err) {
183-
return nil, err
184-
}
185-
186-
// If task was not found, it's possible that the container runtime is still being created.
187-
// Retry every 100ms.
188-
ticker := time.NewTicker(100 * time.Millisecond)
189-
defer ticker.Stop()
190-
191-
for {
192-
select {
193-
case <-ctx.Done():
194-
return nil, errors.New("timed out waiting for container task to start")
195-
case <-ticker.C:
196-
task, err = con.Task(ctx, nil)
197-
if err != nil {
198-
if errdefs.IsNotFound(err) {
199-
continue
200-
}
201-
return nil, err
202-
}
203-
return task.Wait(ctx)
204-
}
205-
}
206-
}
207-
208-
type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error)
209-
210-
func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error {
166+
func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error {
211167
if err := driver.PreProcess(ctx, dataStore, config); err != nil {
212168
return err
213169
}
@@ -236,6 +192,10 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
236192
if err != nil {
237193
log.G(ctx).Errorf("failed to copy stream: %s", err)
238194
}
195+
// Close the pipe writer after all data has been copied.
196+
// This signals EOF to the downstream processLogFunc reader,
197+
// ensuring all data is drained before the pipe is closed.
198+
writer.Close()
239199
}
240200
go copyStream(stdoutR, pipeStdoutW)
241201
go copyStream(stderrR, pipeStderrW)
@@ -269,18 +229,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres
269229
defer wg.Done()
270230
driver.Process(stdout, stderr)
271231
}()
272-
go func() {
273-
// close pipeStdoutW and pipeStderrW upon container exit
274-
defer pipeStdoutW.Close()
275-
defer pipeStderrW.Close()
276-
277-
exitCh, err := getContainerWait(ctx, address, config)
278-
if err != nil {
279-
log.G(ctx).Errorf("failed to get container task wait channel: %v", err)
280-
return
281-
}
282-
<-exitCh
283-
}()
284232
wg.Wait()
285233
return driver.PostProcess()
286234
}
@@ -313,8 +261,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
313261
if err := ready(); err != nil {
314262
return err
315263
}
316-
// getContainerWait is extracted as parameter to allow mocking in tests.
317-
return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config)
264+
return loggingProcessAdapter(ctx, driver, dataStore, config)
318265
})
319266
} else if !errors.Is(err, os.ErrNotExist) {
320267
// the file does not exist if the container was created with nerdctl < 0.20

pkg/logging/logging_test.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ import (
2323
"math/rand"
2424
"strings"
2525
"testing"
26-
"time"
2726

28-
containerd "github.com/containerd/containerd/v2/client"
2927
"github.com/containerd/containerd/v2/core/runtime/v2/logging"
3028
)
3129

@@ -79,21 +77,11 @@ func TestLoggingProcessAdapter(t *testing.T) {
7977
ctx, cancel := context.WithCancel(context.Background())
8078
defer cancel()
8179

82-
var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
83-
exitChan := make(chan containerd.ExitStatus, 1)
84-
time.Sleep(50 * time.Millisecond)
85-
exitChan <- containerd.ExitStatus{}
86-
return exitChan, nil
87-
}
88-
89-
err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config)
80+
err := loggingProcessAdapter(ctx, driver, "testDataStore", config)
9081
if err != nil {
9182
t.Fatal(err)
9283
}
9384

94-
// let bufio read the buffer
95-
time.Sleep(50 * time.Millisecond)
96-
9785
// Verify that the driver methods were called
9886
if !driver.processed {
9987
t.Fatal("process should be processed")

0 commit comments

Comments
 (0)