Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a new cattach client and matching server stream to attach to running job steps over gRPC, including protobuf changes, client/server state machines for handshake and I/O forwarding (PTY, per-task filtering, history replay), and supervisor routing refactors to support multiple front-ends. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Cattach (client)
participant Fore as CraneForeD (cfored)
participant Ctld as Ctld
participant Sup as Supervisor/Crun
Client->>Fore: CattachStream(STEP_CONNECT_REQUEST{job,step,uid,cattach_pid})
Fore->>Ctld: STEP_META_REQUEST(job,step,uid)
Ctld-->>Fore: STEP_META_REPLY{ok,step_meta}
Fore-->>Client: STEP_CONNECT_REPLY{ok, step_meta}
Fore->>Sup: bind per-step IO channel (setRemoteIoToFrontChannel)
Sup-->>Fore: StreamStepIORequest (history + live task output)
Fore-->>Client: TASK_IO_FORWARD (forwarded task output / ready)
Client->>Fore: TASK_IO_FORWARD (stdin bytes / per-task optional)
Fore->>Sup: TASK_INPUT (translated from cattach request)
Sup-->>Fore: JOB_COMPLETION_ACK / TASK_EXIT ...
Fore-->>Client: STEP_COMPLETION_ACK_REPLY / TASK_EXIT_STATUS
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (7)
internal/cattach/cmd.go (2)
1-7: Add missing license header.This file should include the AGPL license header for consistency with other files in the project.
+/** + * Copyright (c) 2024 Peking University and Peking University + * Changsha Institute for Computing and Digital Economy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + package cattach
10-17: Consider documenting the filter flag semantics.The error/output/input filter flags use
uint32but their purpose and expected values aren't clear from the flag descriptions. Consider adding more detailed help text or documentation.- RootCmd.PersistentFlags().Uint32Var(&FlagErrorFilter, "error-filter", 0, "only print stderr from the specified task") - RootCmd.PersistentFlags().Uint32Var(&FlagOutputFilter, "output-filter", 0, "only print stdout from the specified task") - RootCmd.PersistentFlags().Uint32Var(&FlagInputFilter, "input-filter", 0, "send stdin to only the specified task") + RootCmd.PersistentFlags().Uint32Var(&FlagErrorFilter, "error-filter", 0, "only print stderr from the specified task (0 = no filter)") + RootCmd.PersistentFlags().Uint32Var(&FlagOutputFilter, "output-filter", 0, "only print stdout from the specified task (0 = no filter)") + RootCmd.PersistentFlags().Uint32Var(&FlagInputFilter, "input-filter", 0, "send stdin to only the specified task (0 = broadcast to all)")internal/cfored/cattach_server.go (2)
277-278: Fix inconsistent Warning log level usage.The log level should be
WarnnotWarningto maintain consistency with the rest of the codebase.Apply this fix:
-log.Warningf("[Ctld->Cfored->Cattach][Step #%d.%d] Expect type TASK_COMPLETION_ACK_REPLY but got %s, ignored", +log.Warnf("[Ctld->Cfored->Cattach][Step #%d.%d] Expect type TASK_COMPLETION_ACK_REPLY but got %s, ignored",
355-376: Ensure graceful cleanup on state End.The cleanup logic should handle the case where
gVars.ctldReplyChannelMapForCattachByTaskId[taskId]is nil to avoid potential nil map access.Apply this defensive check:
gVars.ctldReplyChannelMapMtx.Lock() if gVars.ctldReplyChannelMapForCattachByTaskId[taskId] != nil { delete(gVars.ctldReplyChannelMapForCattachByTaskId[taskId], cattachPid) + if len(gVars.ctldReplyChannelMapForCattachByTaskId[taskId]) == 0 { + delete(gVars.ctldReplyChannelMapForCattachByTaskId, taskId) + } } -if len(gVars.ctldReplyChannelMapForCattachByTaskId[taskId]) == 0 { - delete(gVars.ctldReplyChannelMapForCattachByTaskId, taskId) -}internal/cattach/cattach.go (2)
257-257: Fix typo in log message.-log.Tracef("Unhanled sig %s", sig.String()) +log.Tracef("Unhandled sig %s", sig.String())
372-372: Fix incorrect loop label.The loop label should be
CattachStateMachineLoopnotCrunStateMachineLoop.-CrunStateMachineLoop: +CattachStateMachineLoop:Also update the break statement on line 385:
-break CrunStateMachineLoop +break CattachStateMachineLoopinternal/cfored/server.go (1)
163-163: Fix inconsistent Warning log level usage.-log.Warningf("[Supervisor->Cfored][Job #%d.%d] Supervisor on Craned %s"+ +log.Warnf("[Supervisor->Cfored][Job #%d.%d] Supervisor on Craned %s"+
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
cmd/cattach/cattach.go(1 hunks)internal/cattach/cattach.go(1 hunks)internal/cattach/cmd.go(1 hunks)internal/cfored/cattach_server.go(1 hunks)internal/cfored/cfored.go(3 hunks)internal/cfored/crun_server.go(1 hunks)internal/cfored/ctld_client.go(4 hunks)internal/cfored/server.go(7 hunks)protos/Crane.proto(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
internal/cattach/cmd.go (4)
internal/util/os.go (1)
DetectNetworkProxy(69-79)internal/cattach/cattach.go (1)
MainCattach(755-801)internal/util/err.go (2)
RunEWrapperForLeafCommand(138-166)RunAndHandleExit(168-178)internal/util/util.go (1)
DefaultConfigPath(80-80)
internal/cfored/cattach_server.go (3)
internal/cfored/server.go (1)
GrpcCforedServer(281-283)internal/util/grpc.go (1)
UnixPeerAuthInfo(74-78)internal/util/string.go (1)
ParseHostList(364-425)
internal/cattach/cattach.go (5)
internal/calloc/calloc.go (1)
ReplyReceiveRoutine(68-85)internal/util/err.go (7)
CraneCmdError(29-29)ErrorSuccess(35-35)ErrorBackend(39-39)ErrorNetwork(38-38)ErrorSystem(41-41)CraneError(44-47)ErrorCmdArg(37-37)internal/cattach/cmd.go (3)
FlagPty(11-11)FlagConfigFilePath(10-10)FlagLayout(16-16)internal/util/string.go (1)
ParseConfig(44-80)internal/util/grpc.go (2)
ClientKeepAliveParams(60-64)ClientConnectParams(66-72)
🔇 Additional comments (17)
cmd/cattach/cattach.go (1)
1-27: LGTM!The entrypoint is clean and follows the established pattern. The AGPL license header is properly included, and the delegation to the internal package is appropriate.
internal/cfored/cfored.go (2)
113-113: LGTM!The initialization of the nested map structure is correct and follows the pattern used for other channel maps.
64-64: Locking verified — ctldReplyChannelMapForCattachByTaskId is accessed with ctldReplyChannelMapMtx.Initialization happens in StartCfored; all subsequent reads/writes (cattach_server.go and ctld_client.go) are wrapped by gVars.ctldReplyChannelMapMtx.Lock()/Unlock(). No unsynchronized access found.
internal/cfored/ctld_client.go (4)
198-209: LGTM! Consistent with existing multiplexing patterns.The TASK_META_REPLY handling follows the established pattern for front-end channel routing and includes proper mutex protection.
249-257: LGTM! Proper broadcast to cattach channels.The implementation correctly broadcasts TASK_COMPLETION_ACK_REPLY to all cattach channels associated with a task, maintaining consistency with the multiplexing design.
338-352: LGTM! Proper cleanup of cattach channels.The cleanup sequence correctly cancels all remaining cattach sessions before resetting the map, preventing resource leaks.
234-247: Derive InteractiveType for orphaned tasks instead of hardcoding Cruninternal/cfored/ctld_client.go (lines 234–247) sends TASK_COMPLETION_REQUEST with InteractiveType: protos.InteractiveTaskType_Crun when a TASK_CANCEL_REQUEST has no front-end mapping; repo search only found InteractiveTaskType_Calloc usages in internal/cfored/calloc_server.go and no other InteractiveTaskType_* symbols. Confirm the proto enum and where a task's InteractiveType is stored, then set InteractiveType from the task metadata/front-end mapping when available; if unavailable, choose a documented safe default and emit a log/metric.
internal/cattach/cmd.go (2)
18-28: LGTM! Well-structured command definition.The command structure follows Cobra best practices with proper use of PersistentPreRun for setup and delegation to MainCattach for the core logic.
31-34: LGTM! Proper command initialization.The ParseCmdArgs function correctly uses the utility wrappers for error handling and exit code management.
internal/cfored/crun_server.go (1)
201-201: Verify updated setRemoteIoToCrunChannel signature is used everywhere.Found definition at internal/cfored/server.go:221–237 — func (keeper *SupervisorChannelKeeper) setRemoteIoToCrunChannel(frontId int32, taskId uint32, stepId uint32, ioToCrunChannel chan *protos.StreamTaskIORequest). ast-grep returned only this definition; no other callers were found. Ensure all call sites (e.g., the gSupervisorChanKeeper.setRemoteIoToCrunChannel(...) call in internal/cfored/crun_server.go) pass the new first arg (crunPid/frontId).
internal/cattach/cattach.go (3)
506-506: LGTM - Clean stdin reader implementation.The epoll-based stdin reader with non-blocking I/O is well-implemented with proper error handling for EAGAIN, EINTR, and EIO.
508-580: Well-designed file pattern parsing with comprehensive placeholder support.The implementation handles escaping, various placeholders, and padding correctly. Good attention to detail.
644-708: Solid X11 forwarding implementation.The X11 forwarding supports both Unix domain sockets and TCP connections with proper cleanup and error handling.
internal/cfored/server.go (4)
48-70: Good implementation of circular buffer for task IO history.The TaskIOBuffer implementation with circular buffer pattern is efficient and well-structured for maintaining IO history.
37-41: Clean abstraction for dual-path routing.The RequestSupervisorChannel struct cleanly separates crun and cattach channels, enabling proper multi-client support.
487-533: Excellent symmetry in cattach request handling.The cattach request forwarding mirrors the crun handling appropriately, maintaining consistency in the dual-path architecture.
275-277: Incorrect nil check logic.The parentheses around the nil check create incorrect precedence.
Apply this fix:
-if (keeper.taskIORequestChannelMap[StepIdentifier{taskId: taskId, StepId: stepId}] != nil) { +if keeper.taskIORequestChannelMap[StepIdentifier{taskId: taskId, StepId: stepId}] != nil { delete(keeper.taskIORequestChannelMap[StepIdentifier{taskId: taskId, StepId: stepId}], cattachPid) }Likely an incorrect or invalid review comment.
|
这个只支持crun任务?我们应该要先支持Ctld重启之后crun/calloc的cfored的重连 |
calloc好像没试,后续看是否支持加上。 然后这个pr需求目前只支持cfored意外down,重启能够重连任务并cattach连接,ctld重启应该还是之前的逻辑。 |
我看了一下,slurm的sattach应该也只支持srun任务,sattach是连接一个step,所以不能连接salloc的shell。 |
34f4f6f to
5da82a2
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
Fix all issues with AI Agents 🤖
In @internal/cattach/cattach.go:
- Around line 498-501: The loop sends the reused buffer slice `buf[:nr]` into
`m.chanInputFromTerm`, causing a data race when `buf` is reused; fix by
allocating a new slice of length `nr`, copying `buf[:nr]` into it (e.g., dst :=
make([]byte, nr); copy(dst, buf[:nr])) and send that `dst` to
`m.chanInputFromTerm` instead of `buf[:nr]` so previously-sent data is not
overwritten.
- Around line 122-134: StateMachineOfCattach.Close may panic if m.conn is nil
(e.g., StateConnectCfored failed before setting m.conn) because Close is
deferred; guard against nil before calling m.conn.Close() by checking if m.conn
!= nil and only then calling Close and logging errors. Also ensure m.conn is
assigned only after a successful connection in StateConnectCfored so the
nil-check is meaningful.
- Around line 610-620: The loop that calls reader.Read(buffer) and sends
buffer[:n] on m.chanInputFromTerm reuses the same underlying buffer across
iterations, causing a data race; update the FileReaderRoutine (the loop labeled
"reading" that uses reader.Read(buffer) and sends into m.chanInputFromTerm) to
allocate a new slice for each read (e.g. make([]byte, n) and copy(buffer[:n]) or
use append([]byte(nil), buffer[:n]...) ) and send that copy instead, and do the
same for the EOF branch before sending the nil sentinel so the receiver doesn't
observe mutated backing memory.
♻️ Duplicate comments (10)
protos/Crane.proto (1)
942-947: Fix typo:stepd_idshould bestep_id.The field name
stepd_idappears to be a typo and should bestep_idfor consistency with other messages.internal/cattach/cattach.go (2)
177-177: Fix field name typo to match proto definition.This should match the proto field name (after fixing the proto typo to
step_id).
439-450: Remove redundant PTY attribute manipulation in reading loop.The PTY attributes are being repeatedly set to raw mode inside the reading loop, which is unnecessary since it's already set once in
StateForwarding. This causes performance overhead on every loop iteration.internal/cfored/cattach_server.go (7)
60-61: Replacelog.Fatalwith proper error handling.Using
log.Fatalterminates the entire process. For a gRPC stream handler, this is too severe - other streams would be killed.
65-67: Replacelog.Fatalfwith error logging and state transition.
121-121: Fix the typo inGetStepdId().Should be
GetStepId()after the proto fix.
231-232: Replacelog.Fatalfwith non-fatal error handling.
317-319: Replacelog.Fatalfwith error logging.
415-417: Replacelog.Fatalfwith error logging.
169-174: Variable shadowing issue withokin ParseHostList.
🧹 Nitpick comments (5)
protos/Crane.proto (1)
752-757: Non-sequential field numbering inStepMetaReq.The field numbers are not sequential (
step_id = 4comes aftercattach_pid = 3). While valid in protobuf, this is unusual and may indicate a copy-paste oversight. Consider reordering for clarity:Suggested fix
message StepMetaReq { uint32 uid = 1; uint32 job_id = 2; - uint32 step_id = 4; - int32 cattach_pid = 3; + int32 cattach_pid = 3; + uint32 step_id = 4; }internal/cattach/cattach.go (1)
410-416: Silent failure on SetNonblock error.When
SetNonblockfails, the function returns silently without any logging, making debugging difficult.Proposed fix
err := syscall.SetNonblock(int(os.Stdin.Fd()), true) if err != nil { + log.Errorf("Failed to set stdin to non-blocking mode: %v", err) return }internal/cfored/server.go (3)
37-41: Misleading field name:cattachRequestChannelMapis not a map.The field
cattachRequestChannelMapis declared aschan *protos.StreamCattachRequest, but the name suggests it's a map. Consider renaming for clarity.Suggested fix
type RequestSupervisorChannel struct { valid *atomic.Bool crunRequestChannel chan *protos.StreamCrunRequest - cattachRequestChannelMap chan *protos.StreamCattachRequest + cattachRequestChannel chan *protos.StreamCattachRequest }
227-234: Consider making buffer capacity configurable.The buffer capacity is hardcoded to 10. For long-running tasks with verbose output, this may be insufficient for cattach clients to receive meaningful history.
Suggested improvement
const DefaultTaskIOBufferCapacity = 100 // or load from config keeper.taskIOBufferMap[StepIdentifier{JobId: taskId, StepId: stepId}] = &TaskIOBuffer{ data: make([]*protos.StreamTaskIORequest, DefaultTaskIOBufferCapacity), head: 0, size: 0, capacity: DefaultTaskIOBufferCapacity, }
251-264: Potential performance concern: broadcasting while holding lock.The
forwardRemoteIoToFrontfunction holdstaskIORequestChannelMtxwhile iterating and sending to all channels. If a channel is full or slow, this could block other operations.Consider using a non-blocking send with logging, similar to
forwardCrunRequestToSupervisor:Suggested improvement
for _, channel := range channelMap { - channel <- ioToFront + select { + case channel <- ioToFront: + default: + log.Warningf("[Supervisor->Cfored->FrontEnd][Job #%d.%d] Front channel full, dropping message", taskId, stepId) + } }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
cmd/cattach/cattach.gointernal/cattach/cattach.gointernal/cattach/cmd.gointernal/cfored/cattach_server.gointernal/cfored/cfored.gointernal/cfored/crun_server.gointernal/cfored/ctld_client.gointernal/cfored/server.goprotos/Crane.proto
🚧 Files skipped from review as they are similar to previous changes (3)
- cmd/cattach/cattach.go
- internal/cfored/cfored.go
- internal/cattach/cmd.go
🧰 Additional context used
🧬 Code graph analysis (3)
internal/cfored/ctld_client.go (2)
internal/cfored/server.go (1)
StepIdentifier(43-46)internal/ccontrol/ccontrol.go (1)
StepIdentifier(50-53)
internal/cattach/cattach.go (1)
internal/util/err.go (6)
ExitCode(30-30)ErrorSuccess(34-34)ErrorBackend(38-38)ErrorNetwork(37-37)ErrorSystem(40-40)CraneError(43-47)
internal/cfored/server.go (1)
internal/ccontrol/ccontrol.go (1)
StepIdentifier(50-53)
🔇 Additional comments (10)
protos/Crane.proto (1)
934-1021: New cattach proto messages look well-structured.The
StreamCattachRequestandStreamCattachReplymessages follow the same pattern as the existingStreamCrunRequest/StreamCrunReply, which is good for consistency. The enum values and payload oneofs are properly defined.internal/cfored/crun_server.go (1)
230-230: LGTM - Updated to use per-front channel routing.The call site correctly passes
crunPidas the front identifier, enabling the new multi-client IO forwarding model where both crun and cattach can receive task IO.internal/cfored/ctld_client.go (3)
200-211: LGTM - STEP_META_REPLY handling added.The new handler correctly routes STEP_META_REPLY to the appropriate front-end channel using
cattachPid, following the same pattern as TASK_ID_REPLY.
240-254: Verify InteractiveType when sending completion for missing channels.When no channel exists for a step receiving TASK_CANCEL_REQUEST, the code sends a TASK_COMPLETION_REQUEST with
InteractiveType_Crunhardcoded. This handles the case where crun has already disconnected but ctld still tries to cancel.However, verify this is the intended behavior for all scenarios, including when cattach sessions might be involved without an active crun.
360-375: LGTM - Proper cattach cleanup on connection loss.The WaitAllFrontEnd state now properly terminates all cattach sessions by sending TASK_COMPLETION_ACK_REPLY to each channel before clearing the map.
internal/cfored/cattach_server.go (2)
257-273: Good implementation of history replay for cattach.The history replay mechanism allows newly attached cattach clients to receive buffered output that was sent before they connected. This is essential for the cattach use case.
356-378: LGTM - Proper cleanup in End state.The End state correctly cleans up all channel mappings and removes the cattach from both the ctld reply channel map and the IO channel map.
internal/cfored/server.go (3)
48-70: LGTM - TaskIOBuffer implementation.The circular buffer implementation is correct.
Pushproperly handles wrap-around and size tracking, andGetHistorycorrectly reconstructs the order from the circular buffer.
481-525: LGTM - Cattach request handling in IOForwarding state.The new case for
pendingCattachReqToSupervisorChannelcorrectly handles both TASK_IO_FORWARD and TASK_X11_FORWARD, mirroring the crun request handling. The reply construction and error handling are consistent.
273-278: Missing buffer cleanup incattachStopAndRemoveChannel.When a cattach client disconnects, its channel is removed but the shared
TaskIOBufferis not affected. This is correct since the buffer is shared across all fronts for a step. However, there's no check if the step still exists before attempting deletion.Suggested defensive check
func (keeper *SupervisorChannelKeeper) cattachStopAndRemoveChannel(taskId uint32, stepId uint32, cattachPid int32) { keeper.taskIORequestChannelMtx.Lock() - if (keeper.taskIORequestChannelMap[StepIdentifier{JobId: taskId, StepId: stepId}] != nil) { + stepId := StepIdentifier{JobId: taskId, StepId: stepId} + if keeper.taskIORequestChannelMap[stepId] != nil { - delete(keeper.taskIORequestChannelMap[StepIdentifier{JobId: taskId, StepId: stepId}], cattachPid) + delete(keeper.taskIORequestChannelMap[stepId], cattachPid) } keeper.taskIORequestChannelMtx.Unlock() }
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/cfored/server.go (1)
695-706:⚠️ Potential issue | 🔴 CriticalAlways install
UnixPeerCredentialson the Unix gRPC server.
CrunStream,CattachStream, andCallocStreamall authorize callers viapeer.AuthInfo.(*util.UnixPeerAuthInfo), but the non-TLS branch (lines 703–706 in server.go) omitsgrpc.Creds(creds). Without credentials installed,peer.FromContext()returns nil or a peer without AuthInfo. The nested authorization checks in all three streams silently skip when AuthInfo is absent, allowing callers to proceed with untrusted UIDs—an authentication bypass on the local socket.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/server.go` around lines 695 - 706, The Unix domain peer credentials are only added when config.TlsConfig.Enabled is true, but CrunStream, CattachStream, and CallocStream expect peer.AuthInfo.(*util.UnixPeerAuthInfo) for authorization; to fix, always include the UnixPeerCredentials in serverOptions regardless of TLS: construct creds := &util.UnixPeerCredentials{} and append grpc.Creds(creds) into the serverOptions used to create the gRPC server in server.go so that peer.FromContext() returns a peer with AuthInfo for local-socket authorization checks.
♻️ Duplicate comments (3)
internal/cattach/cattach.go (2)
553-554:⚠️ Potential issue | 🟠 MajorCopy read buffers before handing them to another goroutine.
Both readers reuse the same backing array on the next
Read, so slices already sent onchanInputFromTermcan be overwritten before the stream-sender goroutine serializes them.Suggested fix
- m.chanInputFromTerm <- buf[:nr] + data := append([]byte(nil), buf[:nr]...) + m.chanInputFromTerm <- dataApply the same copy-before-send pattern in
FileReaderRoutine, including the EOF branch.Also applies to: 666-673
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cattach/cattach.go` around lines 553 - 554, The read buffers sent on m.chanInputFromTerm are reused by subsequent Read calls, so capture and send a copy of buf[:nr] (and an empty copy for the EOF branch) instead of the original slice to avoid data races; update the send in the goroutine (the block that currently does m.chanInputFromTerm <- buf[:nr] and its counterpart in FileReaderRoutine) to allocate a new byte slice, copy the bytes, and send that copy, and apply the same copy-before-send pattern for the EOF branch to ensure the receiver gets stable, immutable data.
130-134:⚠️ Potential issue | 🟠 MajorGuard
Close()when the connection was never created.
MainCattachalways defersClose(), butStateConnectCforedcan leavem.connnil on early failures. That turns a normal error path into a client panic.Suggested fix
func (m *StateMachineOfCattach) Close() { + if m.conn == nil { + return + } err := m.conn.Close() if err != nil { log.Errorf("Failed to close grpc conn: %s", err) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cattach/cattach.go` around lines 130 - 134, The Close method on StateMachineOfCattach should guard against a nil connection to avoid panics when MainCattach defers Close() but StateConnectCfored never created m.conn; update StateMachineOfCattach.Close to check if m.conn is nil before calling m.conn.Close (e.g., if m.conn == nil { return } else call m.conn.Close and log any error) so Close becomes a no-op when no connection exists and still logs Close errors when a connection was present.internal/cfored/cattach_server.go (1)
53-65:⚠️ Potential issue | 🔴 CriticalDon't let one bad attach stream kill
cfored.These paths are driven by client disconnects or unexpected message ordering, but
log.Fatal/log.Fatalfexits the whole daemon. A malformed or early-closedcattachstream should fail this RPC, not take down every other front-end.Also applies to: 148-150, 228-230, 336-337
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 53 - 65, The code currently calls log.Fatal/log.Fatalf in the cattach stream handling (the error switch around err and the Type check against protos.StreamCattachRequest_STEP_CONNECT_REQUEST), which kills the whole daemon on client disconnects or bad messages; change these to return an error for this RPC (or send an appropriate gRPC error on the stream) and close only the offending stream instead of exiting the process. Concretely, replace log.Fatal/log.Fatalf in the error handling block and the Type check path inside the cattach stream handler with non-fatal handling: detect io.EOF and treat it as stream end, for other errors wrap/return the error (or respond with codes.InvalidArgument / codes.Unimplemented as appropriate) so only this RPC fails, and ensure any resources for that stream are cleaned up without calling os.Exit or log.Fatal.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cattach/cattach.go`:
- Around line 583-585: Replace the hardcoded "%s": "0" expansion with the actual
attached step id from the receiver (use the step id field on m — e.g.
fmt.Sprintf("%d", m.stepId) or fmt.Sprintf("%d", m.step.ID) depending on the
struct field name) so that expansions use the real step id instead of always
resolving to 0; update the map entry that currently uses "%s" to call
fmt.Sprintf with the attached step id (keeping the existing "%j" entry using
fmt.Sprintf("%d", m.jobId").
- Around line 308-325: The loop handling messages from m.chanInputFromTerm
currently treats a nil recv as EOF but keeps receiving nils and repeatedly
sending EOFs; modify the handler around the case for msg :=
<-m.chanInputFromTerm so that when msg == nil you build and send the
TASK_IO_FORWARD request with Eof: true exactly once (using ioFwdReq and
StreamCattachRequest_TASK_IO_FORWARD) and then stop further sends by
breaking/returning from the surrounding loop (or setting a sentEOF flag) instead
of continuing to process the channel; ensure TaskId is only set when
FlagInputFilter >= 0 as before and still call m.stream.Send(request) once for
the EOF.
In `@internal/cattach/x11.go`:
- Around line 192-199: The code currently uses a non-blocking select that drops
X11 payloads on backpressure; change the send to block until the supervisor
receives the request so bytes are never dropped: replace the select/default that
writes to session.X11ToSupervisor with a blocking send (e.g., simply send `req`
on session.X11ToSupervisor or loop until sent) and remove the `default` branch
and drop log; retain the trace/error logging around the successful send using
session.X11Id.CranedId and session.X11Id.LocalId as before so the stream remains
intact.
- Around line 303-331: The code is performing blocking sends to
session.X11ToLocal while holding sm.sessionMutex which can deadlock if a
receiver stalls; fix by looking up and storing the session (from sm.x11Sessions
using the X11GlobalId constructed from cranedId/localId) while holding
sm.sessionMutex, then immediately unlock and perform the send
(session.X11ToLocal <- data or <- nil for EOF) outside the mutex; apply the same
change to both the STEP_X11_FORWARD and STEP_X11_EOF handling (and the similar
block at 334-341) so no channel operations occur while sm.sessionMutex is held.
- Around line 102-112: The SendEofToSupervisor code path is building a
STEP_X11_FORWARD frame with an empty payload but never marks EOF, so the remote
X11 socket stays open; when closing the local X11 stream, set the EOF flag on
the outgoing frame by populating the appropriate Eof field on the
protos.StreamCattachRequest_StepX11ForwardReq inside the
PayloadStepX11ForwardReq before sending (the request built as
protos.StreamCattachRequest with Type=StreamCattachRequest_STEP_X11_FORWARD and
sent on session.X11ToSupervisor) so the supervisor/remote peer receives
end-of-stream.
In `@internal/cfored/cattach_server.go`:
- Around line 378-391: The cleanup currently deletes only the per-step map
(gVars.ctldReplyChannelMapForCattachByStep) and pid->step mapping but misses
removing the per-pid routing entry added in CattachWaitConnectReq
(gVars.ctldReplyChannelMapByPid[cattachPid]); update End (and all
exit/early-return paths) to also delete
gVars.ctldReplyChannelMapByPid[cattachPid] while holding the same
ctldReplyChannelMapMtx (check for nil before delete), and ensure this deletion
happens before unlocking so no stale pid entry remains to route future replies
into an orphaned or reused pid.
- Around line 162-171: The mutex gVars.ctldReplyChannelMapMtx is being locked
and gVars.ctldReplyChannelMapByPid is deleted, but if util.ParseHostList returns
ok==false the code does a bare break and never unlocks the mutex; modify the
failure path around
util.ParseHostList(ctldReply.GetPayloadStepMetaReply().Step.GetNodelist()) so
that you call gVars.ctldReplyChannelMapMtx.Unlock() (and any needed cleanup of
gVars.ctldReplyChannelMapByPid for cattachPid) before setting state =
DeadCattach and breaking out; ensure you still hold the same lock semantics for
the normal path and unlock in the common exit point if appropriate.
In `@internal/cfored/server.go`:
- Around line 238-263: forwardCattachRequestToSupervisor currently fans out
every StreamCattachRequest to all supervisors; instead, inspect the request's
CranedId (or equivalent identifier field on protos.StreamCattachRequest) and
route only to the matching supervisorChannel in
keeper.toSupervisorChannels[StepIdentifier{JobId: taskId, StepId: stepId}]; if
the target cranedId doesn't exist, log/echo and return. Update
forwardCattachRequestToSupervisor to read request.CranedId, look up
stepChannels[request.CranedId], check supervisorChannel.valid and push into
supervisorChannel.cattachRequestChannelMap (handling full-channel errors as
before) rather than iterating and broadcasting to all entries.
In `@internal/crun/crun.go`:
- Around line 583-598: The send in the StreamCrunReply_TASK_IO_FORWARD branch
can succeed and leave the step marked Completed even if stopStepCtx was
cancelled during or immediately before the send; ensure cancellation always
forces the job into JobKilling regardless of channel backpressure by checking
stopStepCtx.Done() before or immediately after the send and, if cancelled, set
m.state = JobKilling and return. Concretely, in the TASK_IO_FORWARD handling
(where you use m.chanOutputFromRemote and m.stopStepCtx), either check
m.stopStepCtx.Done() before attempting to send and bail out to set JobKilling,
or perform the current select send then follow with a non-blocking check of
m.stopStepCtx.Done() and transition to JobKilling if closed so the final status
is consistent.
---
Outside diff comments:
In `@internal/cfored/server.go`:
- Around line 695-706: The Unix domain peer credentials are only added when
config.TlsConfig.Enabled is true, but CrunStream, CattachStream, and
CallocStream expect peer.AuthInfo.(*util.UnixPeerAuthInfo) for authorization; to
fix, always include the UnixPeerCredentials in serverOptions regardless of TLS:
construct creds := &util.UnixPeerCredentials{} and append grpc.Creds(creds) into
the serverOptions used to create the gRPC server in server.go so that
peer.FromContext() returns a peer with AuthInfo for local-socket authorization
checks.
---
Duplicate comments:
In `@internal/cattach/cattach.go`:
- Around line 553-554: The read buffers sent on m.chanInputFromTerm are reused
by subsequent Read calls, so capture and send a copy of buf[:nr] (and an empty
copy for the EOF branch) instead of the original slice to avoid data races;
update the send in the goroutine (the block that currently does
m.chanInputFromTerm <- buf[:nr] and its counterpart in FileReaderRoutine) to
allocate a new byte slice, copy the bytes, and send that copy, and apply the
same copy-before-send pattern for the EOF branch to ensure the receiver gets
stable, immutable data.
- Around line 130-134: The Close method on StateMachineOfCattach should guard
against a nil connection to avoid panics when MainCattach defers Close() but
StateConnectCfored never created m.conn; update StateMachineOfCattach.Close to
check if m.conn is nil before calling m.conn.Close (e.g., if m.conn == nil {
return } else call m.conn.Close and log any error) so Close becomes a no-op when
no connection exists and still logs Close errors when a connection was present.
In `@internal/cfored/cattach_server.go`:
- Around line 53-65: The code currently calls log.Fatal/log.Fatalf in the
cattach stream handling (the error switch around err and the Type check against
protos.StreamCattachRequest_STEP_CONNECT_REQUEST), which kills the whole daemon
on client disconnects or bad messages; change these to return an error for this
RPC (or send an appropriate gRPC error on the stream) and close only the
offending stream instead of exiting the process. Concretely, replace
log.Fatal/log.Fatalf in the error handling block and the Type check path inside
the cattach stream handler with non-fatal handling: detect io.EOF and treat it
as stream end, for other errors wrap/return the error (or respond with
codes.InvalidArgument / codes.Unimplemented as appropriate) so only this RPC
fails, and ensure any resources for that stream are cleaned up without calling
os.Exit or log.Fatal.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3bb88b57-e77c-411d-97b5-f3cfbb6df65b
📒 Files selected for processing (12)
cmd/cattach/cattach.gointernal/cattach/cattach.gointernal/cattach/cmd.gointernal/cattach/x11.gointernal/cfored/cattach_server.gointernal/cfored/cfored.gointernal/cfored/crun_server.gointernal/cfored/ctld_client.gointernal/cfored/server.gointernal/crun/crun.gointernal/crun/x11.goprotos/Crane.proto
✅ Files skipped from review due to trivial changes (1)
- internal/crun/x11.go
🚧 Files skipped from review as they are similar to previous changes (4)
- cmd/cattach/cattach.go
- internal/cfored/cfored.go
- internal/cattach/cmd.go
- internal/cfored/ctld_client.go
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (10)
internal/cfored/cattach_server.go (5)
162-172:⚠️ Potential issue | 🔴 CriticalUnlock mutex before breaking on ParseHostList failure.
If
util.ParseHostListreturns false at line 169, the code breaks without callinggVars.ctldReplyChannelMapMtx.Unlock()at line 189. This will leave the mutex locked and cause a deadlock on subsequent operations.🔒 Proposed fix
if Ok { // node[03-04] var ok bool execCranedIds, ok = util.ParseHostList(ctldReply.GetPayloadStepMetaReply().Step.GetNodelist()) if !ok { + gVars.ctldReplyChannelMapMtx.Unlock() state = DeadCattach break }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 162 - 172, The code can exit early after util.ParseHostList returns false without releasing gVars.ctldReplyChannelMapMtx, causing a deadlock; before setting state = DeadCattach and breaking out of the block in the ctldReply handling code (where ctldReply.GetPayloadStepMetaReply().Step.GetNodelist() is parsed), ensure you call gVars.ctldReplyChannelMapMtx.Unlock() (and keep the existing delete(gVars.ctldReplyChannelMapByPid, cattachPid) behavior) so the mutex is always unlocked before break/return paths.
53-66:⚠️ Potential issue | 🟠 MajorReplace
log.Fatalwith graceful error handling.Using
log.Fatalat line 58 andlog.Fatalfat line 64 will terminate the entire cfored process when a single cattach client has issues. These should be non-fatal errors that allow the session to end gracefully.🛠️ Proposed fix
if err != nil { // Failure Edge switch err { case io.EOF: fallthrough default: - log.Fatal(err) - return nil + log.Errorf("[Cfored<-Cattach] Stream error: %s", err) + break CforedCattachStateMachineLoop } } if cattachRequest.Type != protos.StreamCattachRequest_STEP_CONNECT_REQUEST { - log.Fatalf("[Cfored<-Cattach] Expect STEP_CONNECT_REQUEST but got %s", cattachRequest.Type) - break + log.Errorf("[Cfored<-Cattach] Expect STEP_CONNECT_REQUEST but got %s", cattachRequest.Type) + break CforedCattachStateMachineLoop }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 53 - 66, Replace the fatal process-terminating logs in the cattach handling with graceful session-level handling: instead of log.Fatal(err) in the error switch (where err is checked) return the error to the caller or log.Printf the error and close only the current cattach session/stream; likewise replace log.Fatalf for unexpected cattachRequest.Type with a non-fatal log.Printf (or return a typed error) and cleanly terminate the current handler rather than exiting the whole process. Update the handling around the variables err and cattachRequest.Type and keep the check against protos.StreamCattachRequest_STEP_CONNECT_REQUEST, ensuring the function returns an error or ends the session so other clients are unaffected.
373-386:⚠️ Potential issue | 🟠 MajorClean up
ctldReplyChannelMapByPidentry in End state.The pid routing entry added at line 122 (
ctldReplyChannelMapByPid[cattachPid]) is only deleted in the success path at line 163. If the client disconnects before receivingSTEP_META_REPLY, this entry becomes stale. TheEndstate should also delete this entry as a safety measure.🛠️ Proposed fix
case End: log.Infof("[Cfored<->Cattach][Job #%d] Enter State End", jobId) // remove cattach by taskId for ctldReplyChannel gVars.ctldReplyChannelMapMtx.Lock() + delete(gVars.ctldReplyChannelMapByPid, cattachPid) if gVars.ctldReplyChannelMapForCattachByStep[StepIdentifier{JobId: jobId, StepId: stepId}] != nil { delete(gVars.ctldReplyChannelMapForCattachByStep[StepIdentifier{JobId: jobId, StepId: stepId}], cattachPid) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 373 - 386, In the End state cleanup add removal of the per-pid routing entry ctldReplyChannelMapByPid[cattachPid] to avoid stale entries if the client disconnects before STEP_META_REPLY; while holding gVars.ctldReplyChannelMapMtx (the same mutex used for ctldReplyChannelMapForCattachByStep) delete the ctldReplyChannelMapByPid key for cattachPid, then proceed with the existing deletion of ctldReplyChannelMapForCattachByStep and unlocking; ensure you reference ctldReplyChannelMapByPid and cattachPid when making the change so the map entry added earlier is always removed on End.
330-334:⚠️ Potential issue | 🟠 MajorReplace
log.Fatalfwith non-fatal error handling.Using
log.Fatalfat line 331-332 will crash cfored when receiving an unexpected request type from cattach. This should be logged as an error and the session should end gracefully.🛠️ Proposed fix
default: - log.Fatalf("[Cattach->Cfored][Step #%d.%d] Expect STEP_COMPLETION_REQUEST or TASK_IO_FORWARD", + log.Errorf("[Cattach->Cfored][Step #%d.%d] Unexpected request type: %s", - jobId, stepId) + jobId, stepId, cattachRequest.Type) + state = End break forwarding }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 330 - 334, In the default branch of the switch in cattach_server.go (the case handling unexpected request types where jobId and stepId are logged and the forwarding label is used), replace the fatal log call (log.Fatalf) with non-fatal error handling: log the error with log.Errorf or log.Printf including jobId and stepId, perform graceful shutdown of the session/connection (e.g., close the connection or call the session cleanup/close function used elsewhere in this file), and then break out to the forwarding label or return so the session ends cleanly instead of crashing the process.
227-235:⚠️ Potential issue | 🟠 MajorReplace
log.Fatalfwith non-fatal error handling.Using
log.Fatalfat line 229-230 will crash the entire cfored process when receiving an unexpected reply type. This should be logged as an error and handled gracefully.🛠️ Proposed fix
case ctldReply := <-ctldReplyChannel: if ctldReply.Type != protos.StreamCtldReply_JOB_COMPLETION_ACK_REPLY { - log.Fatalf("[Ctld->Cfored->Cattach][Step #%d.%d] Expect type JOB_COMPLETION_ACK_REPLY but got %s, ignored", + log.Errorf("[Ctld->Cfored->Cattach][Step #%d.%d] Expect type JOB_COMPLETION_ACK_REPLY but got %s, ignored", jobId, stepId, ctldReply.Type) } else {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 227 - 235, The code currently calls log.Fatalf when ctldReply.Type != protos.StreamCtldReply_JOB_COMPLETION_ACK_REPLY which crashes the process; replace the fatal exit with non-fatal error handling by logging the unexpected reply via log.Errorf (include jobId, stepId and ctldReply.Type) and continue without terminating the process; keep the existing stopWaiting.Store(true) behavior and only set state = DeadCattach in the branch where ctldReply.Type == protos.StreamCtldReply_JOB_COMPLETION_ACK_REPLY so the handler remains graceful (refer to ctldReplyChannel, ctldReply.Type, protos.StreamCtldReply_JOB_COMPLETION_ACK_REPLY, log.Fatalf, log.Errorf, stopWaiting.Store, and state = DeadCattach).internal/cattach/cattach.go (5)
623-633:⚠️ Potential issue | 🟠 MajorCopy file chunks before sending them over
m.chanInputFromTerm.This has the same aliasing bug as stdin:
buffer[:n]shares backing storage with the nextRead, so the receiver can observe mutated bytes, including the final chunk sent on EOF.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cattach/cattach.go` around lines 623 - 633, The loop sends buffer slices (buffer[:n]) into m.chanInputFromTerm which share backing memory and can be mutated by subsequent Read calls; fix by allocating a new slice and copying the n bytes into it before every send (both in the normal send and the EOF branch) so each message sent on m.chanInputFromTerm has its own independent byte buffer; update the read loop around reader.Read, replace direct uses of buffer[:n] with a copied slice, and keep using m.chanInputFromTerm and the same EOF nil sentinel.
513-514:⚠️ Potential issue | 🟠 MajorCopy stdin bytes before queuing them to another goroutine.
bufis reused on the next read, so slices put onm.chanInputFromTermcan be overwritten before the sender goroutine serializes them onto the stream. Fast input/paste will corrupt forwarded bytes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cattach/cattach.go` around lines 513 - 514, The code sends buf[:nr] to m.chanInputFromTerm but buf is reused on subsequent reads, causing races and corrupted data; change the send to enqueue a copy of the read bytes (e.g., allocate a new slice of length nr, copy(buf[:nr], newSlice) and send that new slice) at the site where m.chanInputFromTerm <- buf[:nr] is called (look for the goroutine or function handling terminal reads that references buf and m.chanInputFromTerm).
543-545:⚠️ Potential issue | 🟠 MajorUse the attached step id for
%sexpansion.Hardcoding
%sto"0"makes file patterns resolve to step 0 even when this client attached to another step.Suggested fix
- "%s": "0", + "%s": fmt.Sprintf("%d", m.stepId),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cattach/cattach.go` around lines 543 - 545, The mapping currently hardcodes the "%s" expansion to "0" which causes patterns to always resolve to step 0; update the expansion to use the actual attached step id instead of "0" (e.g., replace the "%s": "0" entry in the expansions map with the real step id value from the struct/variable used in this file such as m.stepId or the field that represents the attached step), ensuring you format it as a string (similar to how "%j" uses fmt.Sprintf("%d", m.jobId)) so patterns resolve to the correct step for the attached client.
127-138:⚠️ Potential issue | 🔴 CriticalGuard
Close()against partially initialized resources.
m.conncan still be nil on early connect/setup failures, andFlagPtybecomes true beforeStateForwarding()has necessarily captured the original terminal attributes. On early-exit paths like--layout, this cleanup can panic or try to restore a zero-valueTermios.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cattach/cattach.go` around lines 127 - 138, Guard StateMachineOfCattach.Close to avoid using partially initialized resources: check m.conn != nil before calling m.conn.Close(), and only call termios.Tcsetattr to restore terminal attributes when FlagPty is true AND a valid saved attribute was actually captured (e.g., add a boolean like m.ptySaved or check a sentinel on m.savedPtyAttr) so termios.Tcsetattr(os.Stdin.Fd(), termios.TCSANOW, &m.savedPtyAttr) is invoked only when saved state exists; update StateForwarding (or the code that captures the terminal) to set the sentinel when it successfully saves attributes.
301-323:⚠️ Potential issue | 🟠 MajorSend EOF once, then stop the input-forward loop.
When
m.chanInputFromTermis closed, this receive keeps returningnilimmediately. The loop then spins and keeps sendingTASK_IO_FORWARD{eof:true}frames until another branch wins the select.Suggested fix
- case msg := <-m.chanInputFromTerm: + case msg, ok := <-m.chanInputFromTerm: + if !ok { + msg = nil + } ioFwdReq := &protos.StreamCattachRequest_TaskIOForwardReq{ Msg: msg, Eof: msg == nil, } @@ if err := m.stream.Send(request); err != nil { log.Errorf("Failed to send Task IO Forward to CattachStream: %s. "+ "Connection to Cattach is broken", err) gVars.connectionBroken = true return } + if !ok || msg == nil { + return + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cattach/cattach.go` around lines 301 - 323, The code currently keeps receiving nil from m.chanInputFromTerm and repeatedly sends TASK_IO_FORWARD with Eof=true; change the case handling so that when msg == nil you send the EOF frame exactly once and then disable further reads from m.chanInputFromTerm (e.g., set m.chanInputFromTerm = nil) or break/return from the input-forward loop; update the logic around creating ioFwdReq (in the case handling the m.chanInputFromTerm receive and before calling m.stream.Send for StreamCattachRequest_TASK_IO_FORWARD) to perform the one-time EOF send and then stop further sends to avoid the spin, still honoring FlagInputFilter and setting gVars.connectionBroken on send error.
🧹 Nitpick comments (7)
internal/crun/crun.go (1)
729-742: Minor: Duplicate log and typo in error message.Two small issues in adjacent code:
- Line 735 has a typo:
"bug get"→"but got".- Lines 730 and 742 both log
"Job completed.", causing duplicate output.♻️ Suggested fix
case protos.StreamCrunReply_STEP_COMPLETION_ACK_REPLY: - log.Debug("Job completed.") m.state = End } if cforedReply.Type != protos.StreamCrunReply_STEP_COMPLETION_ACK_REPLY { - log.Errorf("Expect JOB_COMPLETION_ACK_REPLY. bug get %s\n", cforedReply.Type.String()) + log.Errorf("Expect JOB_COMPLETION_ACK_REPLY, but got %s", cforedReply.Type.String()) m.err = util.ErrorBackend m.state = End return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/crun/crun.go` around lines 729 - 742, Fix the typo and remove the duplicate log: change the error log in the branch that checks cforedReply.Type to use "but got" instead of "bug get" in the log.Errorf call that mentions protos.StreamCrunReply_STEP_COMPLETION_ACK_REPLY, and remove or consolidate one of the identical log.Debug("Job completed.") calls (either the one inside the case for protos.StreamCrunReply_STEP_COMPLETION_ACK_REPLY or the later one after checking GetPayloadStepCompletionAckReply().Ok) so the completion message is only logged once while keeping m.state = End and m.err handling unchanged.internal/cfored/server.go (6)
732-750: Minor: Update comment to match actual timeout value.The comment at line 733-735 mentions "30 s" but the code uses
5 * time.Secondat line 736. Update the comment for consistency.📝 Proposed fix
gVars.globalCtxCancel() // GracefulStop waits for all active RPCs to finish, which can block // indefinitely if any stream handler (e.g. CattachStream or CrunStream) - // is stuck. Give it 30 s and then force-stop both servers so the + // is stuck. Give it 5 s and then force-stop both servers so the // process always terminates in a bounded time. const gracefulStopTimeout = 5 * time.SecondLGTM!
The bounded graceful shutdown pattern is a good improvement to ensure cfored always terminates in a reasonable time.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/server.go` around lines 732 - 750, The comment describing the bounded graceful shutdown is out of sync with the actual timeout constant; update the comment that currently says "30 s" to reflect the actual value used by gracefulStopTimeout (5 * time.Second) near the block that spawns the goroutine calling unixServer.GracefulStop(), tcpServer.GracefulStop() and later calls unixServer.Stop()/tcpServer.Stop() so the comment accurately documents the 5s timeout.
572-576: Consider replacinglog.Fatalfwith graceful error handling.Using
log.Fatalffor an unexpected crun request type will crash cfored. Consider logging an error and breaking out of the forwarding loop gracefully.🛠️ Proposed fix
default: - log.Fatalf("[Cfored<->Supervisor][Step #%d.%d] Receive Unexpected %s from crun ", + log.Errorf("[Cfored<->Supervisor][Step #%d.%d] Receive Unexpected %s from crun ", jobId, stepId, crunReq.Type.String()) + state = SupervisorUnReg break supervisorIOForwarding🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/server.go` around lines 572 - 576, Replace the fatal crash in the default case of the supervisor IO loop: stop using log.Fatalf and instead log an error with the unexpected crun request details (use jobId, stepId and crunReq.Type.String()), perform any necessary cleanup, and break out of the supervisorIOForwarding loop so cfored continues running gracefully; update the default branch in the switch where log.Fatalf is called to use non-fatal logging and a controlled break/cleanup path.
531-534: Consider replacinglog.Fatalfwith graceful error handling.Using
log.Fatalffor an unexpected supervisor message type will crash the entire cfored process. A malformed or unexpected message from one supervisor should not bring down the daemon. Consider logging an error and transitioning toSupervisorUnReginstead.🛠️ Proposed fix
default: - log.Fatalf("[Supervisor->Cfored][Step #%d.%d] Receive Unexpected %s", + log.Errorf("[Supervisor->Cfored][Step #%d.%d] Receive Unexpected %s", jobId, stepId, supervisorReq.Type.String()) state = SupervisorUnReg break supervisorIOForwarding🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/server.go` around lines 531 - 534, Replace the fatal crash in the supervisor message switch default branch: instead of calling log.Fatalf when encountering an unexpected supervisorReq.Type.String(), log an error (e.g., log.Errorf or log.Printf) including jobId, stepId and the unexpected type, then set state = SupervisorUnReg and return or continue the supervisor handling loop as appropriate; update the default case in the supervisor message processing (the switch handling supervisorReq) to perform non-fatal error handling so one bad message does not terminate cfored.
606-610: Consider replacinglog.Fatalfwith graceful error handling.Using
log.Fatalffor an unexpected cattach request type will crash cfored. Consider logging an error and ending the supervisor session gracefully.🛠️ Proposed fix
default: - log.Fatalf("[Cfored<->Supervisor][Step #%d.%d] Receive Unexpected %s from cattach ", + log.Errorf("[Cfored<->Supervisor][Step #%d.%d] Receive Unexpected %s from cattach ", jobId, stepId, cattachReq.Type.String()) + state = SupervisorUnReg break supervisorIOForwarding🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/server.go` around lines 606 - 610, Replace the unconditional process exit in the default branch (currently using log.Fatalf with jobId, stepId and cattachReq.Type.String()) with graceful error handling: log an error (e.g., log.Printf or logger.Errorf) including the same context (jobId, stepId, cattachReq.Type.String()) and then break out of the supervisorIOForwarding loop to end the supervisor session cleanly instead of terminating the whole process.
271-278: Consider making the history buffer capacity configurable.The
TaskIOBuffercapacity is hardcoded to 10. For longer-running jobs or users who want more output history when attaching, consider making this configurable via the cfored config file.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/server.go` around lines 271 - 278, The TaskIOBuffer capacity is hardcoded to 10 when initializing entries in keeper.taskIOBufferMap; make this capacity configurable by adding a cfored configuration field (e.g., TaskIOHistoryCapacity) and use that value instead of the literal 10 when constructing TaskIOBuffer in the code path that creates a new buffer for StepIdentifier{JobId: jobId, StepId: stepId}; ensure the config has a sensible default (e.g., 10) so existing behavior is preserved and update any relevant config parsing/initialization code to expose the new option.
238-263: Consider the TODO for non-existent step handling.The TODO at line 243 notes that when a step doesn't exist, cattach should echo a message and close. Currently it just logs an error. Since X11 was removed from cattach, the broadcast behavior is acceptable for
TASK_IO_FORWARD, but consider addressing this TODO for better error feedback to users.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/server.go` around lines 238 - 263, The TODO in forwardCattachRequestToSupervisor: when the StepIdentifier (taskId, stepId) doesn't exist the code only logs and returns; update it to send an immediate cattach response back to the caller indicating the step is missing and then close the cattach stream. Concretely, inside the !exist branch of forwardCattachRequestToSupervisor construct and send a protos.StreamCattachResponse (or the appropriate response message used by StreamCattachRequest) that signals an error/closed state (e.g., TASK_IO_FORWARD/error status and an explanatory message), ensure you write it to the request's response/stream channel or use the request's context/stream send method, and then close or finalize that response stream so the front-end gets an echo and termination instead of only a log entry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cattach/cattach.go`:
- Around line 733-740: PrintStepLayout currently emits placeholder output ("1
tasks") and has the per-node/task loop commented out; update
StateMachineOfCattach.PrintStepLayout to either compute and print the real
layout using the step metadata (use m.step.NodeNum, m.step.Nodelist and the
step's task/list structures—e.g., iterate whatever slice/map holds per-node
layouts and task IDs) so each node line shows node index/name and its task IDs,
or if that metadata isn't available yet, make PrintStepLayout return or log a
clear "not implemented" error instead of printing fake values; ensure you modify
the PrintStepLayout method accordingly and remove the hardcoded "1 tasks"
placeholder and commented loop so output reflects actual state or fails loudly.
- Around line 49-52: TaskOutputMsg currently lacks stream metadata so the
--error-filter flag is a no-op; update TaskOutputMsg to carry a stream indicator
(e.g., StreamType or IsError boolean) and propagate that field through
StateForwarding() and any forwarding paths referenced around lines 366-371 and
699-707, then change StdoutWriterRoutine() to check FlagOutputFilter and
skip/write only the appropriate stream based on the new field; alternatively, if
you prefer to disable the flag, make the cmd registration in
internal/cattach/cmd.go reject or error for --error-filter instead of silently
accepting it.
- Around line 769-779: The parsing logic for job/step ids uses strconv.Atoi then
casts to uint32, which allows negative or oversized values to wrap; replace
those conversions to reject out-of-range values by using
strconv.ParseUint(parts[0], 10, 32) and strconv.ParseUint(parts[1], 10, 32) (or
equivalently parse as int then validate i >= 0 && i <= math.MaxUint32) and
return an error if parsing fails or the value is out of range before assigning
to m.jobId and m.stepId; update the error messages to reference the offending
parts (parts[0]/parts[1]) and ensure you import the necessary package(s) if
needed.
In `@internal/cfored/cattach_server.go`:
- Around line 1-13: This file fails gofmt; run the formatter on package cfored
(the file with the import block and symbols like the package declaration and
imported packages such as "CraneFrontEnd/generated/protos",
"CraneFrontEnd/internal/util", and others) to fix spacing/indentation and import
grouping — e.g. run `gofmt -w internal/cfored/cattach_server.go` (or apply your
editor's Go formatting) so the import block and file conform to gofmt style.
- Around line 146-151: The condition `cattachRequest != nil || err == nil` is
wrong and causes the fatal to fire whenever err==nil; update the branch to
explicitly detect an unexpected non-nil message and avoid crashing: replace the
`else if cattachRequest != nil || err == nil { log.Fatal(...) }` with `else if
cattachRequest != nil { log.Warn("[Cattach->Cfored] Unexpected cattachRequest in
waiting state") }` (or another non-fatal log), keeping the `if err != nil { ...
}` branch and preserving the `state = DeadCattach` assignment; reference
symbols: cattachRequest, state, DeadCattach.
In `@internal/cfored/ctld_client.go`:
- Around line 200-211: The code currently calls log.Fatalf when frontPid is
missing from gVars.ctldReplyChannelMapByPid which kills the process; change this
to log.Errorf and simply ignore the ctldReply so the process continues. In the
block handling protos.StreamCtldReply_STEP_META_REPLY (variables frontPid,
gVars.ctldReplyChannelMapMtx, toFrontCtlReplyChannel, ctldReply,
gVars.ctldReplyChannelMapByPid), ensure the mutex is always unlocked (use defer
after Lock or unlock in both branches), replace the fatal log with log.Errorf
including frontPid and context, and do nothing further when ok is false (no
send) to avoid crashing on races.
---
Duplicate comments:
In `@internal/cattach/cattach.go`:
- Around line 623-633: The loop sends buffer slices (buffer[:n]) into
m.chanInputFromTerm which share backing memory and can be mutated by subsequent
Read calls; fix by allocating a new slice and copying the n bytes into it before
every send (both in the normal send and the EOF branch) so each message sent on
m.chanInputFromTerm has its own independent byte buffer; update the read loop
around reader.Read, replace direct uses of buffer[:n] with a copied slice, and
keep using m.chanInputFromTerm and the same EOF nil sentinel.
- Around line 513-514: The code sends buf[:nr] to m.chanInputFromTerm but buf is
reused on subsequent reads, causing races and corrupted data; change the send to
enqueue a copy of the read bytes (e.g., allocate a new slice of length nr,
copy(buf[:nr], newSlice) and send that new slice) at the site where
m.chanInputFromTerm <- buf[:nr] is called (look for the goroutine or function
handling terminal reads that references buf and m.chanInputFromTerm).
- Around line 543-545: The mapping currently hardcodes the "%s" expansion to "0"
which causes patterns to always resolve to step 0; update the expansion to use
the actual attached step id instead of "0" (e.g., replace the "%s": "0" entry in
the expansions map with the real step id value from the struct/variable used in
this file such as m.stepId or the field that represents the attached step),
ensuring you format it as a string (similar to how "%j" uses fmt.Sprintf("%d",
m.jobId)) so patterns resolve to the correct step for the attached client.
- Around line 127-138: Guard StateMachineOfCattach.Close to avoid using
partially initialized resources: check m.conn != nil before calling
m.conn.Close(), and only call termios.Tcsetattr to restore terminal attributes
when FlagPty is true AND a valid saved attribute was actually captured (e.g.,
add a boolean like m.ptySaved or check a sentinel on m.savedPtyAttr) so
termios.Tcsetattr(os.Stdin.Fd(), termios.TCSANOW, &m.savedPtyAttr) is invoked
only when saved state exists; update StateForwarding (or the code that captures
the terminal) to set the sentinel when it successfully saves attributes.
- Around line 301-323: The code currently keeps receiving nil from
m.chanInputFromTerm and repeatedly sends TASK_IO_FORWARD with Eof=true; change
the case handling so that when msg == nil you send the EOF frame exactly once
and then disable further reads from m.chanInputFromTerm (e.g., set
m.chanInputFromTerm = nil) or break/return from the input-forward loop; update
the logic around creating ioFwdReq (in the case handling the m.chanInputFromTerm
receive and before calling m.stream.Send for
StreamCattachRequest_TASK_IO_FORWARD) to perform the one-time EOF send and then
stop further sends to avoid the spin, still honoring FlagInputFilter and setting
gVars.connectionBroken on send error.
In `@internal/cfored/cattach_server.go`:
- Around line 162-172: The code can exit early after util.ParseHostList returns
false without releasing gVars.ctldReplyChannelMapMtx, causing a deadlock; before
setting state = DeadCattach and breaking out of the block in the ctldReply
handling code (where ctldReply.GetPayloadStepMetaReply().Step.GetNodelist() is
parsed), ensure you call gVars.ctldReplyChannelMapMtx.Unlock() (and keep the
existing delete(gVars.ctldReplyChannelMapByPid, cattachPid) behavior) so the
mutex is always unlocked before break/return paths.
- Around line 53-66: Replace the fatal process-terminating logs in the cattach
handling with graceful session-level handling: instead of log.Fatal(err) in the
error switch (where err is checked) return the error to the caller or log.Printf
the error and close only the current cattach session/stream; likewise replace
log.Fatalf for unexpected cattachRequest.Type with a non-fatal log.Printf (or
return a typed error) and cleanly terminate the current handler rather than
exiting the whole process. Update the handling around the variables err and
cattachRequest.Type and keep the check against
protos.StreamCattachRequest_STEP_CONNECT_REQUEST, ensuring the function returns
an error or ends the session so other clients are unaffected.
- Around line 373-386: In the End state cleanup add removal of the per-pid
routing entry ctldReplyChannelMapByPid[cattachPid] to avoid stale entries if the
client disconnects before STEP_META_REPLY; while holding
gVars.ctldReplyChannelMapMtx (the same mutex used for
ctldReplyChannelMapForCattachByStep) delete the ctldReplyChannelMapByPid key for
cattachPid, then proceed with the existing deletion of
ctldReplyChannelMapForCattachByStep and unlocking; ensure you reference
ctldReplyChannelMapByPid and cattachPid when making the change so the map entry
added earlier is always removed on End.
- Around line 330-334: In the default branch of the switch in cattach_server.go
(the case handling unexpected request types where jobId and stepId are logged
and the forwarding label is used), replace the fatal log call (log.Fatalf) with
non-fatal error handling: log the error with log.Errorf or log.Printf including
jobId and stepId, perform graceful shutdown of the session/connection (e.g.,
close the connection or call the session cleanup/close function used elsewhere
in this file), and then break out to the forwarding label or return so the
session ends cleanly instead of crashing the process.
- Around line 227-235: The code currently calls log.Fatalf when ctldReply.Type
!= protos.StreamCtldReply_JOB_COMPLETION_ACK_REPLY which crashes the process;
replace the fatal exit with non-fatal error handling by logging the unexpected
reply via log.Errorf (include jobId, stepId and ctldReply.Type) and continue
without terminating the process; keep the existing stopWaiting.Store(true)
behavior and only set state = DeadCattach in the branch where ctldReply.Type ==
protos.StreamCtldReply_JOB_COMPLETION_ACK_REPLY so the handler remains graceful
(refer to ctldReplyChannel, ctldReply.Type,
protos.StreamCtldReply_JOB_COMPLETION_ACK_REPLY, log.Fatalf, log.Errorf,
stopWaiting.Store, and state = DeadCattach).
---
Nitpick comments:
In `@internal/cfored/server.go`:
- Around line 732-750: The comment describing the bounded graceful shutdown is
out of sync with the actual timeout constant; update the comment that currently
says "30 s" to reflect the actual value used by gracefulStopTimeout (5 *
time.Second) near the block that spawns the goroutine calling
unixServer.GracefulStop(), tcpServer.GracefulStop() and later calls
unixServer.Stop()/tcpServer.Stop() so the comment accurately documents the 5s
timeout.
- Around line 572-576: Replace the fatal crash in the default case of the
supervisor IO loop: stop using log.Fatalf and instead log an error with the
unexpected crun request details (use jobId, stepId and crunReq.Type.String()),
perform any necessary cleanup, and break out of the supervisorIOForwarding loop
so cfored continues running gracefully; update the default branch in the switch
where log.Fatalf is called to use non-fatal logging and a controlled
break/cleanup path.
- Around line 531-534: Replace the fatal crash in the supervisor message switch
default branch: instead of calling log.Fatalf when encountering an unexpected
supervisorReq.Type.String(), log an error (e.g., log.Errorf or log.Printf)
including jobId, stepId and the unexpected type, then set state =
SupervisorUnReg and return or continue the supervisor handling loop as
appropriate; update the default case in the supervisor message processing (the
switch handling supervisorReq) to perform non-fatal error handling so one bad
message does not terminate cfored.
- Around line 606-610: Replace the unconditional process exit in the default
branch (currently using log.Fatalf with jobId, stepId and
cattachReq.Type.String()) with graceful error handling: log an error (e.g.,
log.Printf or logger.Errorf) including the same context (jobId, stepId,
cattachReq.Type.String()) and then break out of the supervisorIOForwarding loop
to end the supervisor session cleanly instead of terminating the whole process.
- Around line 271-278: The TaskIOBuffer capacity is hardcoded to 10 when
initializing entries in keeper.taskIOBufferMap; make this capacity configurable
by adding a cfored configuration field (e.g., TaskIOHistoryCapacity) and use
that value instead of the literal 10 when constructing TaskIOBuffer in the code
path that creates a new buffer for StepIdentifier{JobId: jobId, StepId: stepId};
ensure the config has a sensible default (e.g., 10) so existing behavior is
preserved and update any relevant config parsing/initialization code to expose
the new option.
- Around line 238-263: The TODO in forwardCattachRequestToSupervisor: when the
StepIdentifier (taskId, stepId) doesn't exist the code only logs and returns;
update it to send an immediate cattach response back to the caller indicating
the step is missing and then close the cattach stream. Concretely, inside the
!exist branch of forwardCattachRequestToSupervisor construct and send a
protos.StreamCattachResponse (or the appropriate response message used by
StreamCattachRequest) that signals an error/closed state (e.g.,
TASK_IO_FORWARD/error status and an explanatory message), ensure you write it to
the request's response/stream channel or use the request's context/stream send
method, and then close or finalize that response stream so the front-end gets an
echo and termination instead of only a log entry.
In `@internal/crun/crun.go`:
- Around line 729-742: Fix the typo and remove the duplicate log: change the
error log in the branch that checks cforedReply.Type to use "but got" instead of
"bug get" in the log.Errorf call that mentions
protos.StreamCrunReply_STEP_COMPLETION_ACK_REPLY, and remove or consolidate one
of the identical log.Debug("Job completed.") calls (either the one inside the
case for protos.StreamCrunReply_STEP_COMPLETION_ACK_REPLY or the later one after
checking GetPayloadStepCompletionAckReply().Ok) so the completion message is
only logged once while keeping m.state = End and m.err handling unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 78bcc093-53bf-49e9-97cf-1ac74ab06a8c
📒 Files selected for processing (6)
internal/cattach/cattach.gointernal/cfored/cattach_server.gointernal/cfored/crun_server.gointernal/cfored/ctld_client.gointernal/cfored/server.gointernal/crun/crun.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/cfored/crun_server.go
There was a problem hiding this comment.
Actionable comments posted: 7
♻️ Duplicate comments (4)
internal/cfored/cattach_server.go (3)
170-180:⚠️ Potential issue | 🔴 CriticalUnlock
ctldReplyChannelMapMtxon theParseHostListfailure path.The bare
breakskips the unlock below. The next ctld demux or cattach cleanup touching this mutex will deadlock.Suggested fix
var ok bool execCranedIds, ok = util.ParseHostList(ctldReply.GetPayloadStepMetaReply().Step.GetNodelist()) if !ok { + gVars.ctldReplyChannelMapMtx.Unlock() state = DeadCattach break }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 170 - 180, The mutex gVars.ctldReplyChannelMapMtx is locked before deleting gVars.ctldReplyChannelMapByPid but on the ParseHostList failure path the code does "state = DeadCattach; break" without unlocking, causing a deadlock; fix by ensuring gVars.ctldReplyChannelMapMtx.Unlock() is called before breaking (either call Unlock() right after the delete and before calling ParseHostList or add a defer gVars.ctldReplyChannelMapMtx.Unlock() immediately after acquiring the lock) so that the mutex is always released when ParseHostList returns !ok in the branch inside the cattach handling code.
246-276:⚠️ Potential issue | 🟠 MajorHandle client disconnects and shutdown while waiting for supervisors.
This state only listens to
ctldReplyChannelandreadyChannel. If the client disconnects, orgVars.globalCtxis cancelled before any supervisor registers, the stream stays parked here andwaitAnySupervisorReadykeeps sleeping.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 246 - 276, The select only watches ctldReplyChannel and readyChannel so it can block indefinitely if the client disconnects or the global context is cancelled; add cases for <-gVars.globalCtx.Done() and <-toCattachStream.Context().Done() (or the stream's done channel) inside the same select to handle shutdowns: on either case set state to End (or DeadCattach as appropriate), call stopWaiting.Store(true) and gSupervisorChanKeeper.broadcastStopWaiting() so waitAnySupervisorReady can wake, and ensure any send to toCattachStream is skipped after shutdown to avoid blocking.
144-150:⚠️ Potential issue | 🔴 CriticalFix the wait-state guard; it currently turns any extra client message into a daemon exit.
err == nilmakes this branch true for every normal receive, andlog.Fatalkills all ofcforedinstead of rejecting just this stream.Suggested fix
case item := <-RequestChannel: cattachRequest, err := item.message, item.err if err != nil { log.Debug("[Cattach->Cfored] Connection to cattach was broken.") - } else if cattachRequest != nil || err == nil { - log.Fatal("[Cattach->Cfored] Expect only nil (cattach connection broken) here!") + } else if cattachRequest != nil { + log.Warnf("[Cattach->Cfored] Unexpected %s while waiting for STEP_META_REPLY; closing this attach stream.", + cattachRequest.Type) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/cattach_server.go` around lines 144 - 150, The current guard uses "else if cattachRequest != nil || err == nil" so normal receives (err == nil) always hit the branch and call log.Fatal; change the condition to only check the unexpected message case (e.g., "else if cattachRequest != nil") and replace the global log.Fatal call with non-fatal per-stream rejection handling (e.g., log.Warn or return/close only this stream) so RequestChannel, cattachRequest, item.message, item.err, and the stream are handled without exiting the whole cfored process.internal/cfored/ctld_client.go (1)
200-211:⚠️ Potential issue | 🟠 MajorDon't terminate
cforedon a lateSTEP_META_REPLY.A cattach that disconnects before metadata arrives removes its PID entry first. A late reply then hits this
log.Fatalfpath and kills the whole daemon instead of dropping a stale control-plane message.Suggested fix
gVars.ctldReplyChannelMapMtx.Lock() toFrontCtlReplyChannel, ok := gVars.ctldReplyChannelMapByPid[frontPid] if ok { toFrontCtlReplyChannel <- ctldReply } else { - log.Fatalf("[Cfored<->Ctld] Front pid %d shall exist "+ - "in ctldReplyChannelMapByPid!", frontPid) + log.Errorf("[Cfored<->Ctld] Front pid %d not found in "+ + "ctldReplyChannelMapByPid; dropping stale STEP_META_REPLY.", frontPid) } gVars.ctldReplyChannelMapMtx.Unlock()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cfored/ctld_client.go` around lines 200 - 211, The current STEP_META_REPLY handler calls log.Fatalf when the frontPid key is missing, which crashes cfored on late/stale replies; change this to safely drop the reply and log a non-fatal message (e.g., log.Warnf or log.Debugf) instead. Also ensure the mutex on gVars.ctldReplyChannelMapMtx is always released (use defer gVars.ctldReplyChannelMapMtx.Unlock() immediately after Lock()), locate the code around the case protos.StreamCtldReply_STEP_META_REPLY and replace the fatal path that references frontPid, toFrontCtlReplyChannel, and ctldReply with a non-fatal drop and return/continue so stale messages are ignored without terminating the daemon.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cfored/cattach_server.go`:
- Around line 279-293: Replay is duplicating pre-ready task IO because
CattachWaitStepMeta registers TaskIoRequestChannel before history is replayed;
when iterating history from gSupervisorChanKeeper.getRemoteHistory in the loop,
skip or stop forwarding Task IO messages that are already covered by the live
channel by detecting the TASK_IO_FORWARD_READY sentinel (or checking presence in
taskIOBufferMap/TaskIoRequestChannel) before calling forwardTaskMsgToCattach; in
practice, modify the loop that reads history to ignore TASK_IO messages after
(or until) TASK_IO_FORWARD_READY for that job/step (or skip messages found in
taskIOBufferMap) so you don't forward the same pre-ready output twice.
In `@internal/cfored/ctld_client.go`:
- Around line 257-263: The code is sending ctldReply to channels while holding
ctldReplyChannelMapMtx which can block the router; change the pattern in the
ctld reply routing (the block that looks up
gVars.ctldReplyChannelMapForCattachByStep with StepIdentifier{JobId: jobId,
StepId: stepId}) to copy the channel slice/map entries into a local variable
while holding ctldReplyChannelMapMtx, then release the mutex and perform the
actual sends to each toCattachCtlReplyChannel using ctldReply; apply the same
change to the analogous code region around the other instance (the other send
loop noted at lines ~360-375) so no channel send happens under
ctldReplyChannelMapMtx.
In `@internal/cfored/server.go`:
- Around line 283-308: forwardCattachRequestToSupervisor currently broadcasts a
StreamCattachRequest to every supervisor in stepChannels; change it to honor a
task-scoped stdin by checking the request's TaskId: if request.TaskId != 0, only
send to the supervisorChannel associated with that task (compare request.TaskId
to the supervisorChannel's tracked task id field or metadata on the channel); if
request.TaskId == 0 keep the existing broadcast behavior. Locate
forwardCattachRequestToSupervisor, stepChannels, supervisorChannel and cranedId
to add the conditional filter, and update the error/log messages accordingly
when skipping non-matching supervisors.
- Around line 210-228: The nil crash notifications currently sent to channels
(collected from keeper.stepIORequestChannelMap into variable channels) can block
if a front-end buffer is full; change the send loop that iterates over channels
and does channel <- nil to use a non-blocking send (e.g., select with case
channel <- nil and default) so a full consumer won’t block teardown; keep the
existing lock/unlock around building channels but perform non-blocking sends
outside the lock in the same send loop that handles the nil StreamStepIORequest
crash signal.
In `@protos/Crane.proto`:
- Around line 1122-1124: TaskIOErrOutputForwardReply currently only carries
bytes msg, so stderr can't be associated with a task; add a task_id field (same
type as stdout's task_id) to TaskIOErrOutputForwardReply and propagate that same
task_id into the corresponding supervisor request message and the cattach reply
message (the stderr forward/supervisor/cattach messages referenced in the proto)
so stderr can be filtered and labeled per-task.
- Around line 1023-1140: Remove the unused X11 attach protocol elements from the
cattach proto so clients won't assume X11 is supported: delete the
STEP_X11_FORWARD value from StreamCattachRequest.CattachRequestType and the
StepX11ForwardReq message and its oneof field payload_step_x11_forward_req;
likewise delete STEP_X11_FORWARD, STEP_X11_CONN, STEP_X11_EOF from
StreamCattachReply.CforedCattachReplyType and remove the StepX11ForwardReply,
StepX11ConnReply, StepX11EofReply messages and their corresponding oneof payload
fields (payload_step_x11_forward_reply, payload_step_x11_conn_reply,
payload_step_x11_eof_reply); after editing StreamCattachRequest and
StreamCattachReply, run your protobuf codegen to update generated clients.
---
Duplicate comments:
In `@internal/cfored/cattach_server.go`:
- Around line 170-180: The mutex gVars.ctldReplyChannelMapMtx is locked before
deleting gVars.ctldReplyChannelMapByPid but on the ParseHostList failure path
the code does "state = DeadCattach; break" without unlocking, causing a
deadlock; fix by ensuring gVars.ctldReplyChannelMapMtx.Unlock() is called before
breaking (either call Unlock() right after the delete and before calling
ParseHostList or add a defer gVars.ctldReplyChannelMapMtx.Unlock() immediately
after acquiring the lock) so that the mutex is always released when
ParseHostList returns !ok in the branch inside the cattach handling code.
- Around line 246-276: The select only watches ctldReplyChannel and readyChannel
so it can block indefinitely if the client disconnects or the global context is
cancelled; add cases for <-gVars.globalCtx.Done() and
<-toCattachStream.Context().Done() (or the stream's done channel) inside the
same select to handle shutdowns: on either case set state to End (or DeadCattach
as appropriate), call stopWaiting.Store(true) and
gSupervisorChanKeeper.broadcastStopWaiting() so waitAnySupervisorReady can wake,
and ensure any send to toCattachStream is skipped after shutdown to avoid
blocking.
- Around line 144-150: The current guard uses "else if cattachRequest != nil ||
err == nil" so normal receives (err == nil) always hit the branch and call
log.Fatal; change the condition to only check the unexpected message case (e.g.,
"else if cattachRequest != nil") and replace the global log.Fatal call with
non-fatal per-stream rejection handling (e.g., log.Warn or return/close only
this stream) so RequestChannel, cattachRequest, item.message, item.err, and the
stream are handled without exiting the whole cfored process.
In `@internal/cfored/ctld_client.go`:
- Around line 200-211: The current STEP_META_REPLY handler calls log.Fatalf when
the frontPid key is missing, which crashes cfored on late/stale replies; change
this to safely drop the reply and log a non-fatal message (e.g., log.Warnf or
log.Debugf) instead. Also ensure the mutex on gVars.ctldReplyChannelMapMtx is
always released (use defer gVars.ctldReplyChannelMapMtx.Unlock() immediately
after Lock()), locate the code around the case
protos.StreamCtldReply_STEP_META_REPLY and replace the fatal path that
references frontPid, toFrontCtlReplyChannel, and ctldReply with a non-fatal drop
and return/continue so stale messages are ignored without terminating the
daemon.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c5fb3b0c-3fd2-48b4-bcf3-eaf024735982
📒 Files selected for processing (11)
cmd/cattach/cattach.gointernal/cattach/cattach.gointernal/cattach/cmd.gointernal/cfored/cattach_server.gointernal/cfored/cfored.gointernal/cfored/crun_server.gointernal/cfored/ctld_client.gointernal/cfored/server.gointernal/crun/crun.gointernal/crun/x11.goprotos/Crane.proto
✅ Files skipped from review due to trivial changes (2)
- internal/crun/x11.go
- internal/cattach/cattach.go
🚧 Files skipped from review as they are similar to previous changes (3)
- cmd/cattach/cattach.go
- internal/cfored/cfored.go
- internal/crun/crun.go
| // I/O message from Supervisor to Crun | ||
| stepIORequestChannelMap map[StepIdentifier]chan *protos.StreamStepIORequest | ||
| // I/O message from Supervisor to Crun/Cattach | ||
| stepIORequestChannelMap map[StepIdentifier]map[int32]chan *protos.StreamStepIORequest |
There was a problem hiding this comment.
这里的int32是taskid?是的话没有必要嵌套一层?需要的msg会自带taskid
There was a problem hiding this comment.
这是前端的pid 可能会有多个cattach连接
|
为什么这个PR会有Crun的变更? |
- Guard Close() against nil connection to prevent panic - Fix buffer data race: copy bytes before sending to chanInputFromTerm - Fix %s file pattern expansion to use actual attached step ID - Fix logic condition bug in CattachWaitStepMeta that caused Fatal on any valid message
- Add early detection in CattachWaitIOForward to check if the crun channel is still active before attempting to wait for supervisor channels, preventing indefinite blocking when a step completes early - Add a non-blocking pre-forwarding check for JOB_COMPLETION_ACK_REPLY from ctld to handle the case where the step finishes between state transitions - Add a second crun channel liveness check after the readyChannel wait to catch completions that occur during supervisor setup - Remove unused `forwardEstablished` atomic bool that was never read - Simplify switch statement indentation in cattach StateForwarding and add a default warning log for unhandled message types - Remove TASK_EXIT_STATUS and STEP_CANCEL_REQUEST handling from cattach StateForwarding as exit status reporting is now handled elsewhere
…iet flags Introduce `TaskOutputMsg` to carry task ID alongside output data through internal channels, enabling per-task I/O control: - `--label`: prepend `[task_id]: ` to each output line via `applyLabel` - `--output-filter`: suppress output from tasks other than the specified one - `--input-filter`: direct stdin to a specific task instead of broadcasting - `--quiet`: suppress the "Task io forward ready" status message
Replace `CattachWaitTaskMeta` with `CattachWaitStepMeta` and `CattachWaitTaskComplete` with `CattachWaitJobComplete` to align state names with step/job terminology. Also replace incorrect `End` state transitions with `DeadCattach` for broken connection handling, and update log messages to reflect the new naming conventions.
Call `taskFinishCb()` before transitioning to the End state in `StateForwarding` to ensure all IO goroutines (StdinReaderRoutine, StdoutWriterRoutine, input-forward) are signaled to exit cleanly and don't leak when the task finishes, encounters an error, or the Cfored connection is broken. Also add `broadcastStopWaiting()` in cfored's cattach and crun server state machines to wake up goroutines blocked in `waitSupervisorChannelsReady` so they can observe `stopWaiting == true` and exit promptly after receiving a completion/cancel reply from Ctld.
…terminal Replace bare channel sends in `StateForwarding` and `StateWaitAck` with `select` statements that also listen on a cancellation context (`taskFinishCtx`/`stopStepCtx`). Without this guard, a slow terminal fills `chanOutputFromRemote`, permanently blocking the forwarding loop and making SIGINT/Ctrl+C ineffective. Additional fixes: - Pre-initialize `stopStepCtx`/`stopStepCb` in `crun.Init()` to avoid nil-pointer dereferences when `StateWaitAck` is reached without ever entering `StateForwarding`. - Add a `globalCtx.Done()` case in cfored's `WAIT_TASK_IO_FORWARD` loop so cfored transitions to `CancelJobOfDeadCrun` immediately on shutdown instead of racing against a synthetic `JOB_CANCEL_REQUEST` that may not arrive within the 30-second window.
Replace the Warning-level log with a Debug-level message when no front-end is connected for a step during IO forwarding. This condition is expected during the brief window between crun exiting and the supervisor stopping its output stream, so a Warning was misleading and noisy. The new message also better explains the situation.
Replace the blocking channel send in `forwardRemoteIoToFront` with a non-blocking `select` to prevent deadlocks caused by slow front-end consumers (e.g. terminals). Previously, a backed-up front-end channel could block the supervisor IO stream, causing cfored, the supervisor stream, and crun/cattach to wait on each other indefinitely. The new behavior: - Delivers messages immediately if the channel has capacity - Drops messages when the front-end channel is full, logging a trace - Returns early if cfored is shutting down (globalCtx cancelled) Dropping output lines for slow interactive consumers (crun/cattach) is an acceptable trade-off to keep the supervisor stream flowing.
Replace the single-channel X11 forwarding approach (`chanX11InputFromLocal`, `chanX11OutputFromRemote`) with an `X11SessionMgr` that handles per-session demultiplexing by `(CranedId, LocalId)` pairs. This mirrors the session manager used in crun and enables multi-task X11 jobs where each task can open concurrent X11 sessions. The old `StartX11ReaderWriterRoutine` is removed in favor of `X11SessionMgr.SessionMgrRoutine`, and reply routing now handles `STEP_X11_CONN`, `STEP_X11_FORWARD`, and `STEP_X11_EOF` messages through the manager.
Correct misaligned switch/case blocks, const declarations, and select statements in cattach.go and cattach_server.go to properly reflect their logical nesting level. No functional changes were made.
Remove all X11 forwarding logic from the cattach state machine, including the X11SessionMgr field, X11 request/reply channel handling in StateForwarding, and X11 session initialization in StartIOForward. Also removes corresponding X11 message routing in cfored's cattach server (STEP_X11_FORWARD case) and cleans up related forwarding logic in forwardTaskMsgToCattach.
Corrected improper indentation of `case` blocks within the `switch` statement in the `CforedCattachStateMachineLoop` function to align with Go formatting conventions.
- Fix wrong map reference from `taskIORequestChannelMap` to `stepIORequestChannelMap` in `setRemoteIoToFrontChannel` - Replace incorrect `StreamTaskIORequest` type with `StreamStepIORequest` in channel and buffer initialization - Remove duplicate `getStepDoneChannel` function definition - Fix `TASK_ERR_OUTPUT` to route via `forwardRemoteIoToFront` instead of `forwardRemoteIoToCrun` - Fix indentation in supervisor request switch statement
- Add `chanErrOutputFromRemote` channel to `StateMachineOfCattach` for handling stderr output separately from stdout - Implement `StderrWriterRoutine` to write remote stderr output to local `os.Stderr`, with proper draining on task completion - Handle new `TASK_ERR_OUTPUT_FORWARD` reply type in `StateForwarding` to route stderr messages to the dedicated channel - Update `cfored` to forward `TASK_ERR_OUTPUT` messages from supervisor to cattach clients using the new proto message type - Add `TaskIOErrOutputForwardReply` proto message and `TASK_ERR_OUTPUT_FORWARD` enum value to `StreamCattachReply` This enables cattach to properly separate and display stderr output from remote tasks instead of mixing it with stdout.
Add missing context cancellation cases in `StateForwarding` and `StateWaitAck` to prevent goroutine hangs when a job is terminated or cfored connection is lost: - In `StateForwarding`: transition to `JobKilling` when `stopStepCtx` is done during stdout/stderr forwarding - In `StateWaitAck`: silently drop output messages when `stopStepCtx` is done (allows draining replyChannel to receive `STEP_COMPLETION_ACK_REPLY`), and transition to `End` state when `globalCtx` is cancelled due to cfored shutdown or connection loss
When a job step is started with `crun --input=<task_id>`, the step's `IoMeta.InputTaskId` is set, meaning stdin is exclusively routed to one specific task. In this case, `cattach` now automatically enters read-only mode: it displays task output but does not forward any stdin from the local terminal. Key changes: - Add `FlagReadOnly` flag in `cmd.go`, set automatically during `StateWaitForward` by inspecting `IoMeta.InputTaskId` (non-nil and non-PTY implies read-only) - Skip `StdinReaderRoutine` and the stdin-forwarding goroutine in `StartIOForward`/`StateForwarding` when `FlagReadOnly` is true - PTY mode is explicitly excluded from read-only since it always requires full interactive control
- Remove ctldReplyChannelMapByPid entry when cattach disconnects before ctld replies, preventing a permanent channel leak that bypassed the normal cleanup path in the STEP_META_REPLY branch - Remove the non-blocking select default case in forwardRemoteIoToFront to stop silently dropping IO messages when the front-end channel is full - Delete the outer stepIORequestChannelMap entry when the last cattach disconnects so forwardRemoteIoToFront correctly detects no connected front-end instead of iterating over an empty map
…consumers Add a `default` branch to the channel send select in `forwardRemoteIoToFront` so that a full or already-exited front-end IO channel never blocks the supervisor IO forwarding goroutine. Messages are dropped with a warning log when the channel is full, ensuring the supervisor stream remains unblocked.
Add a check for interactive metadata in the STEP_META_REPLY handler. If a job lacks interactive metadata (e.g., a batch job), cattach is now explicitly rejected with a clear failure reason instead of causing a potential nil pointer dereference. Also refactors the reply block and extracts `failureReason` earlier to be used consistently in all failure paths.
Increase `ctldReplyChannel` buffer from 2→8 in both calloc and cattach servers to prevent overflow when `WaitAllFrontEnd` pre-sends cancel and completion ACK messages alongside pre-existing ctld messages. Increase `TaskIoRequestChannel` buffer from 2→64 in cattach to reduce message drops in `forwardRemoteIoToFront` when the supervisor produces output faster than cattach consumes it. Replace usage of `ctldReplyChannelMapByPid` with the dedicated `ctldReplyChannelMapForCattachByPid` in cattach so that `WaitAllFrontEnd` sends the correct termination message type (`JOB_COMPLETION_ACK_REPLY`) instead of the calloc/crun-specific `JOB_ID_REPLY`, and ensures proper cleanup on early disconnects to avoid channel leaks.
…al leaks in cattach server
…l logs with error logs
…or messages for job and step IDs Co-authored-by: Copilot <copilot@github.com>
…hannel to prevent message duplication
…ward messages from StreamCattachRequest
Summary by CodeRabbit
New Features
Bug Fixes