Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
90ce504
Initial commit
JeffMboya Jan 28, 2026
8c22961
feat(controllers): add missing RBAC markers to all controllers
JeffMboya Feb 3, 2026
b7240b4
Clean up operator: remove WasmTask, restore MQTT fail-fast, add type …
JeffMboya Feb 4, 2026
544c134
Address comments
JeffMboya Feb 4, 2026
b744be0
Update v1alpha1 API group to propeller.propeller.abstractmachines.fr
JeffMboya Feb 4, 2026
371ea6d
Address comments
JeffMboya Feb 4, 2026
6857e62
Move validation to main
JeffMboya Feb 6, 2026
6f25c7f
Align CRDs with propeller types
JeffMboya Mar 2, 2026
a7d951c
Remove binary and update .gitignore
JeffMboya Mar 2, 2026
fc9a152
Add state transition validation, extend scheduler interface, add MQTT…
JeffMboya Mar 3, 2026
f1ec0eb
Fix MQTT topic path for SuperMQ compatibility
JeffMboya Mar 4, 2026
a1e2b07
Fix alignment of error variable declarations in scheduler
JeffMboya Mar 4, 2026
eedd550
fix: pre-allocate candidates slice to satisfy prealloc linter
JeffMboya Mar 4, 2026
b395465
fix: relax task state transitions and add retry logic for proplet upd…
JeffMboya Mar 4, 2026
e140bd8
docs: improve sample manifests with documentation links and cleaner p…
JeffMboya Mar 4, 2026
171d986
docs: fix propeller website URL in sample manifests
JeffMboya Mar 4, 2026
476bca6
docs: use correct kubernetes-operator example URL
JeffMboya Mar 4, 2026
e071bab
feat(task): integrate scheduler and DAG dependency gate
JeffMboya Mar 18, 2026
bd6ea3b
feat(propellerjob): DAG-aware task execution for all three modes
JeffMboya Mar 18, 2026
4d642c8
fix(proplet): populate AliveHistory on each MQTT heartbeat
JeffMboya Mar 18, 2026
e3acc14
fix(controllers): strict Kubebuilder compliance — event-driven reconcile
JeffMboya Mar 18, 2026
87a405e
fix: resolve CI failures — lint, test panic, and E2E startup
JeffMboya Mar 18, 2026
f0fb6d0
fix(config): remove MQTT placeholder args from base manager manifest
JeffMboya Mar 18, 2026
94fcf5a
fix(operator): sync with main propeller codebase
JeffMboya Apr 8, 2026
91c724e
fix(operator): fix controller and wire-format bugs
JeffMboya Apr 8, 2026
54f5235
chore: remove redundant inline comments
JeffMboya Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ go.work.sum
# Editor/IDE
# .idea/
# .vscode/
main
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ help: ## Display this help.
manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
$(CONTROLLER_GEN) rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases


.PHONY: generate
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
Expand Down
337 changes: 295 additions & 42 deletions README.md

Large diffs are not rendered by default.

104 changes: 104 additions & 0 deletions api/v1/job_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2025.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type (
// +kubebuilder:validation:Enum=parallel;sequential;configurable
ExecutionMode string
// +kubebuilder:validation:Enum=Pending;Running;Completed;Failed
JobPhase string
)

const (
ExecutionModeParallel ExecutionMode = "parallel"
ExecutionModeSequential ExecutionMode = "sequential"
ExecutionModeConfigurable ExecutionMode = "configurable"

JobPhasePending JobPhase = "Pending"
JobPhaseRunning JobPhase = "Running"
JobPhaseCompleted JobPhase = "Completed"
JobPhaseFailed JobPhase = "Failed"
)

// PropellerJobSpec defines the desired state of PropellerJob
type PropellerJobSpec struct {
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
Name string `json:"name"`

// ExecutionMode determines how tasks are executed
// +kubebuilder:validation:Enum=parallel;sequential;configurable
// +kubebuilder:default="configurable"
ExecutionMode ExecutionMode `json:"executionMode,omitempty"`

// Tasks is a list of task specifications to execute as part of this job
Tasks []TaskSpec `json:"tasks,omitempty"`

// TaskRefs references existing Task resources by name
TaskRefs []string `json:"taskRefs,omitempty"`
}

// PropellerJobStatus defines the observed state of PropellerJob
type PropellerJobStatus struct {
// +kubebuilder:default="Pending"
Phase JobPhase `json:"phase"`
StartTime *metav1.Time `json:"startTime,omitempty"`
FinishTime *metav1.Time `json:"finishTime,omitempty"`
TaskCount int `json:"taskCount,omitempty"`
CompletedCount int `json:"completedCount,omitempty"`
FailedCount int `json:"failedCount,omitempty"`
SkippedCount int `json:"skippedCount,omitempty"`
// Conditions represent the latest available observations of the job's state
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Namespaced,shortName=pjob
// +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Mode",type=string,JSONPath=`.spec.executionMode`
// +kubebuilder:printcolumn:name="Tasks",type=integer,JSONPath=`.status.taskCount`
// +kubebuilder:printcolumn:name="Completed",type=integer,JSONPath=`.status.completedCount`
// +kubebuilder:printcolumn:name="Failed",type=integer,JSONPath=`.status.failedCount`
// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`

// PropellerJob is the Schema for the propellerjobs API
// PropellerJob groups multiple Tasks for coordinated execution with DAG support
type PropellerJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec PropellerJobSpec `json:"spec,omitempty"`
Status PropellerJobStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// PropellerJobList contains a list of PropellerJob
type PropellerJobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []PropellerJob `json:"items"`
}

func init() {
SchemeBuilder.Register(&PropellerJob{}, &PropellerJobList{})
}
4 changes: 4 additions & 0 deletions api/v1/proplet_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type ExternalPropletSpec struct {
Capabilities []string `json:"capabilities,omitempty"`
}

// K8sPropletSpec defines the configuration for a Kubernetes-backed Proplet.
type K8sPropletSpec struct {
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
Expand All @@ -60,6 +61,7 @@ type K8sPropletSpec struct {
Replicas *int32 `json:"replicas,omitempty"`
}

// ConnectionConfig defines the MQTT connection configuration for a Proplet.
type ConnectionConfig struct {
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
Expand Down Expand Up @@ -125,6 +127,8 @@ type PropletStatus struct {
Phase PropletPhase `json:"phase"`
Conditions []PropletCondition `json:"conditions,omitempty"`
LastSeen *metav1.Time `json:"lastSeen,omitempty"`
// AliveHistory stores recent heartbeat timestamps for liveness tracking
AliveHistory []metav1.Time `json:"aliveHistory,omitempty"`
// +kubebuilder:default=0
TaskCount uint64 `json:"taskCount"`
AvailableResources *PropletResources `json:"availableResources,omitempty"`
Expand Down
147 changes: 123 additions & 24 deletions api/v1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,51 @@

package v1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

type (
// +kubebuilder:validation:Enum=k8s;external;any
PropletKind string
// +kubebuilder:validation:Enum=pending;scheduled;running;completed;failed
// +kubebuilder:validation:Enum=pending;scheduled;running;completed;failed;skipped;interrupted
TaskPhase string
// +kubebuilder:validation:Enum=Scheduled;Started;Completed
TaskConditionType string
// +kubebuilder:validation:Enum=standard;federated
TaskKind string
// +kubebuilder:validation:Enum=infer;train
TaskMode string
)

const (
K8sProplet PropletKind = "k8s"
ExternalProplet PropletKind = "external"
AnyProplet PropletKind = "any"

TaskPendingPhase TaskPhase = "pending"
TaskScheduledPhase TaskPhase = "scheduled"
TaskRunningPhase TaskPhase = "running"
TaskCompletedPhase TaskPhase = "completed"
TaskFailedPhase TaskPhase = "failed"
TaskPendingPhase TaskPhase = "pending"
TaskScheduledPhase TaskPhase = "scheduled"
TaskRunningPhase TaskPhase = "running"
TaskCompletedPhase TaskPhase = "completed"
TaskFailedPhase TaskPhase = "failed"
TaskSkippedPhase TaskPhase = "skipped"
TaskInterruptedPhase TaskPhase = "interrupted"

ScheduledType TaskConditionType = "Scheduled"
StartedType TaskConditionType = "Started"
CompletedType TaskConditionType = "Completed"

TaskKindStandard TaskKind = "standard"
TaskKindFederated TaskKind = "federated"

TaskModeInfer TaskMode = "infer"
TaskModeTrain TaskMode = "train"

Check failure on line 63 in api/v1/task_types.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

File is not properly formatted (gofmt)

Check failure on line 63 in api/v1/task_types.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

File is not properly formatted (gofmt)
)

type PropletSelector struct {
Expand Down Expand Up @@ -75,18 +92,72 @@
// +kubebuilder:validation:MinLength=1
// +kubebuilder:validation:MaxLength=253
Name string `json:"name"`
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
FunctionName string `json:"functionName"`
// FunctionName is the name of the WASM function to invoke. Optional; if
// omitted the proplet uses the task Name as the function identifier, which
// is the behaviour of the main propeller manager.
FunctionName string `json:"functionName,omitempty"`
// +kubebuilder:validation:Enum=standard;federated
// +kubebuilder:default="standard"
Kind TaskKind `json:"kind,omitempty"`
ImageURL string `json:"imageUrl,omitempty"`
File []byte `json:"file,omitempty"`
CLIArgs []string `json:"cliArgs,omitempty"`
Inputs []uint64 `json:"inputs,omitempty"`
CLIArgs []string `json:"cliArgs,omitempty"`
// Inputs are task input arguments. Each element is a string but numeric
// values are accepted and coerced — matching the FlexStrings behaviour of
// the main propeller manager.
Inputs []string `json:"inputs,omitempty"`
PropletSelector *PropletSelector `json:"propletSelector,omitempty,omitzero"`
// +kubebuilder:validation:Enum=k8s;external;any
// +kubebuilder:default="any"
PreferredPropletType PropletKind `json:"preferredPropletType,omitempty"`
ResourceRequirements *PropletResources `json:"resourceRequirements,omitempty,omitzero"`
PreferredPropletType PropletKind `json:"preferredPropletType,omitempty"`
ResourceRequirements *PropletResources `json:"resourceRequirements,omitempty,omitzero"`
Env map[string]string `json:"env,omitempty"`
Daemon bool `json:"daemon,omitempty"`
Mode TaskMode `json:"mode,omitempty"`
MonitoringProfile *MonitoringProfile `json:"monitoringProfile,omitempty"`
RestartPolicy corev1.RestartPolicy `json:"restartPolicy,omitempty"`

// Confidential computing fields
Encrypted bool `json:"encrypted,omitempty"`
KBSResourcePath string `json:"kbsResourcePath,omitempty"`

// Workflow/DAG fields
// DependsOn specifies task IDs that must complete before this task runs
DependsOn []string `json:"dependsOn,omitempty"`
// RunIf specifies when to run: "success" (default) or "failure"
// +kubebuilder:validation:Enum=success;failure
RunIf string `json:"runIf,omitempty"`
// WorkflowID groups tasks into a workflow for DAG execution
WorkflowID string `json:"workflowId,omitempty"`
// JobID groups tasks into a job for batch execution
JobID string `json:"jobId,omitempty"`

// Scheduling fields
// Schedule is a cron expression for recurring tasks
Schedule string `json:"schedule,omitempty"`
// IsRecurring indicates if the task should repeat after completion
IsRecurring bool `json:"isRecurring,omitempty"`
// Timezone for schedule interpretation (default: UTC)
Timezone string `json:"timezone,omitempty"`
// Priority (higher values = higher priority, default: 50)
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=100
// +kubebuilder:default=50
Priority int `json:"priority,omitempty"`
}

// MonitoringProfile defines monitoring configuration for task execution
type MonitoringProfile struct {
Enabled bool `json:"enabled,omitempty"`
Interval *metav1.Duration `json:"interval,omitempty"`
CollectCPU bool `json:"collectCpu,omitempty"`
CollectMemory bool `json:"collectMemory,omitempty"`
CollectDiskIO bool `json:"collectDiskIo,omitempty"`
CollectThreads bool `json:"collectThreads,omitempty"`
CollectFileDescriptors bool `json:"collectFileDescriptors,omitempty"`
ExportToMQTT bool `json:"exportToMqtt,omitempty"`
RetainHistory bool `json:"retainHistory,omitempty"`
HistorySize int `json:"historySize,omitempty"`
}

type TaskCondition struct {
Expand All @@ -103,22 +174,25 @@
// Important: Run "make" to regenerate code after modifying this file

// +kubebuilder:default="pending"
Phase TaskPhase `json:"phase"`
AssignedProplet string `json:"assignedProplet,omitempty"`
CreatedAt *metav1.Time `json:"createdAt,omitempty,omitzero"`
UpdatedAt *metav1.Time `json:"updatedAt,omitempty,omitzero"`
StartedAt *metav1.Time `json:"startedAt,omitempty,omitzero"`
FinishedAt *metav1.Time `json:"finishedAt,omitempty,omitzero"`
Results string `json:"results,omitempty"`
Error string `json:"error,omitempty"`
Conditions []TaskCondition `json:"conditions,omitzero"`
Phase TaskPhase `json:"phase"`
AssignedProplet string `json:"assignedProplet,omitempty"`
CreatedAt *metav1.Time `json:"createdAt,omitempty,omitzero"`
UpdatedAt *metav1.Time `json:"updatedAt,omitempty,omitzero"`
StartedAt *metav1.Time `json:"startedAt,omitempty,omitzero"`
FinishedAt *metav1.Time `json:"finishedAt,omitempty,omitzero"`
// NextRun is the next scheduled execution time for recurring tasks
NextRun *metav1.Time `json:"nextRun,omitempty,omitzero"`
// +kubebuilder:pruning:PreserveUnknownFields
Results *apiextensionsv1.JSON `json:"results,omitempty"`
Error string `json:"error,omitempty"`
Conditions []TaskCondition `json:"conditions,omitzero"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Namespaced
// +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Function",type=string,JSONPath=`.spec.functionName`
// +kubebuilder:printcolumn:name="Mode",type=string,JSONPath=`.spec.mode`
// +kubebuilder:printcolumn:name="Proplet",type=string,JSONPath=`.status.assignedProplet`
// +kubebuilder:printcolumn:name="Start Time",type=date,JSONPath=`.status.startedAt`
// +kubebuilder:printcolumn:name="Finish Time",type=date,JSONPath=`.status.finishedAt`
Expand Down Expand Up @@ -154,3 +228,28 @@
func init() {
SchemeBuilder.Register(&Task{}, &TaskList{})
}

// stateTransitionMap defines valid task phase transitions
var stateTransitionMap = map[TaskPhase][]TaskPhase{
TaskPendingPhase: {TaskScheduledPhase, TaskRunningPhase, TaskCompletedPhase, TaskFailedPhase, TaskSkippedPhase},
TaskScheduledPhase: {TaskRunningPhase, TaskCompletedPhase, TaskFailedPhase, TaskSkippedPhase},
TaskRunningPhase: {TaskCompletedPhase, TaskFailedPhase, TaskInterruptedPhase},
TaskCompletedPhase: {TaskPendingPhase}, // Allow restart for recurring tasks
TaskFailedPhase: {TaskPendingPhase}, // Allow retry
TaskSkippedPhase: {}, // Terminal state
TaskInterruptedPhase: {TaskPendingPhase}, // Allow resume
}

// ValidStateTransition checks if a transition from src to dst phase is allowed
func ValidStateTransition(src, dst TaskPhase) bool {
validTransitions, ok := stateTransitionMap[src]
if !ok {
return false
}
for _, valid := range validTransitions {
if valid == dst {
return true
}
}
return false
}
Loading
Loading