Conversation
| @@ -0,0 +1,49 @@ | |||
| package v1alpha1 | |||
There was a problem hiding this comment.
Why do we have wasmtask_type yet all out tasks are wasm and are already defined?
| if err != nil { | ||
| setupLog.Error(err, "failed to initialize mqtt pubsub") | ||
| var mqttPubSub mqtt.PubSub | ||
| hasPartialMQTTConfig := (mqttAddress != "" || clientID != "" || clientKey != "" || domainID != "" || channelID != "") |
There was a problem hiding this comment.
Why do we need all this?
The previous approach looks cleaner since if any of the variables are not present it would failed to initialize MQTT pubsub
| @@ -0,0 +1,78 @@ | |||
| package v1alpha1 | |||
There was a problem hiding this comment.
Shouldn't this be on the v1 package?
There was a problem hiding this comment.
The placement is correct unless we are treating Federated learning orchestration as a core functionality
| } | ||
|
|
||
| type PropletResources struct { | ||
| // CPU capacity (e.g., "1000m" for 1 CPU core) |
There was a problem hiding this comment.
Is there a reason we have removed these comments because they are used to generate descriptions for these types?
| availableResources: | ||
| properties: | ||
| cpu: | ||
| description: CPU capacity (e.g., "1000m" for 1 CPU core) |
There was a problem hiding this comment.
This is the description generated from comments
| @@ -0,0 +1,20 @@ | |||
| # Scheduler Package | |||
rodneyosodo
left a comment
There was a problem hiding this comment.
Can you share an example of fml application running in a k8s cluster?
| @@ -0,0 +1,78 @@ | |||
| package v1alpha1 | |||
| r.pubsub = pubsub | ||
| r.baseTopic = fmt.Sprintf(superMQBaseTopic, domainID, channelID) | ||
|
|
||
| if r.pubsub == nil || domainID == "" || channelID == "" { |
There was a problem hiding this comment.
I think this is done on main?
6f423ff to
f43d176
Compare
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
- Added missing RBAC markers to FederatedJob, TrainingRound, and Task controllers - Regenerated manifests with complete RBAC permissions - Removed temporary validation scripts and test fixtures
…descriptions - Remove redundant WasmTask API type and CRD (all tasks use Task API) - Revert MQTT initialization to fail-fast approach (matches main branch) - Add type-level comments for K8sPropletSpec and ConnectionConfig to generate CRD descriptions - Update README to use Task instead of WasmTask throughout - Remove unused scheduler README
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
- Add TaskKind, TaskMode, Skipped/Interrupted phases to Task CRD - Add confidential computing fields (Encrypted, KBSResourcePath) - Add workflow/DAG fields (DependsOn, RunIf, WorkflowID, JobID) - Add scheduling fields (Schedule, IsRecurring, Timezone, Priority) - Expand MonitoringProfile with granular collection flags - Create PropellerJob CRD for workflow orchestration - Add AliveHistory to Proplet status - Update MQTT payload in task controller - Register PropellerJob controller in main
… status handler - Add ValidStateTransition() and stateTransitionMap to enforce valid task phase changes - Extend Scheduler interface with SelectCandidateProplets(), Score(), Pick() 3-phase pattern - Update RoundRobin scheduler with proplet filtering (type, labels, device, capabilities) - Add mqttStatusHandler for external proplet task status updates - Validate state transitions in mqttResultHandler and mqttStatusHandler
SuperMQ routes MQTT messages at m/{domain}/c/{channel}/{sub-topic}
without a messages/ infix. The previous base topic string caused the
operator to subscribe to messages/# instead of #, so no proplet
heartbeats or task results were ever received.
…ates - Allow pending→completed and scheduled→completed transitions for fast-executing tasks - Preserve LastSeen from fresh fetch in updatePropletStatus retry loop - Add retry-with-refetch on conflict errors in mqttResultHandler
Add round-robin proplet selection to TaskReconciler: - SelectProplet is called when no PropletSelector.PropletID is set, listing all running proplets in the namespace and delegating to internal/scheduler.RoundRobin (feasibility → score → pick) - Explicit PropletSelector.PropletID continues to bypass the scheduler Add dependency gate in handlePending: - Inspect every task listed in DependsOn; wait until all are terminal - Transition to Skipped when run_if conditions are not satisfied (success mode + any dep failed; failure mode + no dep failed) - Requeue every 15s while waiting, so dependent tasks start promptly once their prerequisites complete Add handleTerminal for recurring tasks: - When IsRecurring is true and Status.NextRun is set, requeue at NextRun time then reset phase to pending for the next execution Add internal/dag package with AllDepsTerminal and ShouldSkip helpers shared between the task and job controllers. Handle Skipped and Interrupted phases in Reconcile so they are treated as terminal with no further action.
parallel: create all tasks immediately, no dependency ordering.
configurable: create all tasks immediately and resolve DependsOn
values from TaskSpec-relative names (the Name field) to their
Kubernetes resource names ({job-name}-{spec-name}). The
TaskReconciler's dependency gate then controls when each task
actually starts — no polling or annotation tricks needed.
sequential: create tasks one at a time in spec order. handleRunning
checks whether all previously created tasks are terminal; if so, it
creates the next task using the spec-index annotation.
All modes:
- Tasks are named {job-name}-{spec-name} so DependsOn refs stay stable
- Tasks carry propeller.propeller.abstractmachines.fr/job label for
efficient listing without a field indexer
- spec-index and spec-name annotations support sequential ordering and
DependsOn resolution respectively
- controllerutil.SetControllerReference replaces manual OwnerReference
construction
Append the heartbeat timestamp to Status.AliveHistory in mqttLivenessHandler and cap the slice at the last 10 entries, matching the propeller manager's aliveHistoryLimit behaviour. Wire scheduler into TaskReconciler setup in cmd/main.go.
Replace two anti-patterns with idiomatic controller-runtime:
1. MQTT goroutines → Reconcile (PropletReconciler)
- mqttLivenessHandler no longer calls Status().Update() directly.
- Heartbeat timestamp is stored in pendingHeartbeats (sync.Map).
- A GenericEvent is sent to propletEvents (buffered chan).
- applyPendingHeartbeat() in Reconcile() does the LoadAndDelete and
updates LastSeen / AliveHistory before the status write.
- SetupWithManager registers WatchesRawSource(source.Channel(...)).
- PropletReconciler now subscribes only to /control/proplet/alive;
task result/status topics belong to TaskReconciler.
2. Polling → event-driven Watches (TaskReconciler)
- Dependency gate in handlePending no longer issues a 15 s requeue.
Instead, Watches(&Task{}, EnqueueRequestsFromMapFunc(enqueueDependents))
fans out reconcile requests to all pending dependents the moment a
dependency reaches a terminal phase.
- A field indexer on spec.dependsOn makes the dependent lookup O(log n).
- External task handleRunning no longer polls (30 s requeue removed).
Completion arrives via mqttResultHandler → taskEvents channel.
- mqttResultHandler and mqttStatusHandler moved here from PropletReconciler;
they store updates in pendingResults (sync.Map) and send GenericEvents.
- applyMQTTUpdate() in Reconcile() drains pending updates and persists
the new phase before the state-machine switch.
- K8s Job completion is driven by Owns(&batchv1.Job{}); safety requeue
extended to 5 min.
Lint (golangci-lint): - propellerjob_controller.go: fix gofmt formatting in comment block - propellerjob_controller.go: replace deprecated result.Requeue with result.RequeueAfter in advanceSequential early-return condition - proplet_controller.go: remove unused propletMatchesTaskSelector method and its now-unused slices import Unit test panic (task_controller_test.go): - selectProplet: guard r.sched nil dereference — return an error when no scheduler is configured instead of panicking; test constructs the reconciler without SetupWithManager so sched was nil E2E startup crash (controller pod CrashLoopBackOff): - cmd/main.go: make MQTT optional — only call mqtt.NewPubSub when mqttAddress is provided; log a warning and continue when not configured - proplet_controller.go SetupWithManager: guard Subscribe call with if r.pubsub != nil so the controller registers cleanly without MQTT - task_controller.go SetupWithManager: same guard for both Subscribe calls No functional change when MQTT is configured.
The base manager.yaml had literal placeholder values like <mqtt-broker-address>, <client-id>, etc. These caused the E2E controller pod to crash-loop: the manager attempted to connect to the literal string as an MQTT broker URL, timed out, and exited. MQTT configuration is deployment-specific and must not be baked into the base kustomize layer. Operators configure it via a kustomize overlay or directly in their deployment. Without any --mqtt-address flag, main.go skips MQTT initialisation and the controller starts cleanly.
Wire-format fixes (proplet protocol alignment):
- Use task.Spec.FunctionName (fallback task.Name) as MQTT "name" field — proplet
uses this as the WASM function name (service.rs:575)
- Fix monitoring profile key from "monitoring_profile" to "monitoringProfile" to
match proplet's serde rename annotation
- Fix monitoring profile interval from Go duration string to uint64 seconds as
required by proplet's serde_duration module
- Fix results double-marshaling: proplet ResultMessage.results is a plain String,
not a JSON object; stop re-marshaling the already-string value
- Remove /control/proplet/status subscription — this topic does not exist in the
proplet implementation
- Fix LWT topic: remove spurious /messages/ segment
(correct: m/{domainID}/c/{channelID}/control/proplet/alive)
- Fix LWT payload: use MQTT client ID (id param) as proplet_id, not username
Kubernetes API correctness:
- applyMQTTUpdate: re-store pending result on Status().Update conflict so updates
are not permanently lost on transient API errors
- applyMQTTUpdate: return (update, bool) so caller can restore on conflict
- Only set task.Status.FinishedAt for terminal phases (Completed/Failed/Interrupted);
intermediate phase transitions must not overwrite the finish timestamp
- Fix silent _ = r.Status().Update in sequential PropellerJob handleRunning
- selectProplet: verify pinned proplet is Running before dispatching task
- taskKubeName: lowercase output to satisfy Kubernetes RFC 1123 naming rules
PropellerJob logic:
- Include TaskRefs in TaskCount so TaskRefs-only jobs don't immediately complete
- Monitor TaskRef terminal states in handleRunning alongside owned tasks
mqtt.go:
- Fix SetOnConnectHandler log: "MQTT connection lost" → "MQTT connected"
task_controller.go: - startExternalTask: propagate Status().Update error in MQTT-nil path instead of silently discarding it (prevented infinite reconcile loop on misconfiguration) traininground_controller.go: - checkTimeout: change return to (ctrl.Result, bool, error) so Status().Update failures are no longer silently dropped (rounds could hang in Running forever) - processParticipants: handle TaskSkippedPhase and TaskInterruptedPhase — these phases never deliver an update and must be marked terminal so the round does not wait for them indefinitely - handleRunning: add early-fail when all participants are terminal but KOfN is unachievable, avoiding an infinite requeue loop when no timeout is set
f43d176 to
54f5235
Compare
What type of PR is this?
This is a feature
What does this do?
Updates the operator to use latest Proplet
Which issue(s) does this PR fix/relate to?
NOISSUE – updates existing operator to align with current Proplet execution contract
Have you included tests for your changes?
No, validated manually using operator deployment and sample WasmTask workloads
Did you document any new/modified features?
Yes, updated operator documentation and examples to reflect new execution flow.
Notes
Deprecates legacy task execution path; preserves MQTT for external proplet execution