Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,7 @@ func (svc *service) publishStart(ctx context.Context, t task.Task, propletID str
"name": t.Name,
"state": t.State,
"image_url": t.ImageURL,
"wasm_http_url": t.WasmHTTPURL,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the image_url, can we identify if the wasm module is in a registry or not?

For example:

  • docker.io/rodneydav/addition.wasm -> registry
  • https://propeller.absmach.eu/examples/addition.wasm -> Blob

"file": t.File,
"inputs": t.Inputs,
"cli_args": t.CLIArgs,
Expand Down
29 changes: 15 additions & 14 deletions pkg/sdk/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (
const tasksEndpoint = "/tasks"

type Task struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
Kind string `json:"kind,omitempty"`
State uint8 `json:"state,omitempty"`
Mode string `json:"mode,omitempty"`
ImageURL string `json:"image_url,omitempty"`
JobID string `json:"job_id,omitempty"`
CLIArgs []string `json:"cli_args,omitempty"`
Env map[string]string `json:"env,omitempty"`
StartTime time.Time `json:"start_time"`
FinishTime time.Time `json:"finish_time"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Results any `json:"results,omitempty"`
ID string `json:"id,omitempty"`
Name string `json:"name"`
Kind string `json:"kind,omitempty"`
State uint8 `json:"state,omitempty"`
Mode string `json:"mode,omitempty"`
ImageURL string `json:"image_url,omitempty"`
WasmHTTPURL string `json:"wasm_http_url,omitempty"`
JobID string `json:"job_id,omitempty"`
CLIArgs []string `json:"cli_args,omitempty"`
Env map[string]string `json:"env,omitempty"`
StartTime time.Time `json:"start_time"`
FinishTime time.Time `json:"finish_time"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Results any `json:"results,omitempty"`
}

type TaskPage struct {
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ func (db *Database) Migrate() error {
`ALTER TABLE proplets DROP COLUMN IF EXISTS metadata`,
},
},
{
Id: "4_add_wasm_http_url",
Up: []string{
`ALTER TABLE tasks ADD COLUMN IF NOT EXISTS wasm_http_url TEXT`,
},
Down: []string{
`ALTER TABLE tasks DROP COLUMN IF EXISTS wasm_http_url`,
},
},
},
}

Expand Down
22 changes: 14 additions & 8 deletions pkg/storage/postgres/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type dbTask struct {
Name string `db:"name"`
State uint8 `db:"state"`
ImageURL *string `db:"image_url"`
WasmHTTPURL *string `db:"wasm_http_url"`
File []byte `db:"file"`
CLIArgs []byte `db:"cli_args"`
Inputs []byte `db:"inputs"`
Expand All @@ -45,13 +46,13 @@ type dbTask struct {
Mode *string `db:"mode"`
}

const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted,
const taskColumns = `id, name, state, image_url, wasm_http_url, file, cli_args, inputs, env, daemon, encrypted,
kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time,
created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode`

func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
query := `INSERT INTO tasks (` + taskColumns + `)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)`
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)`

cliArgs, err := jsonBytes(t.CLIArgs)
if err != nil {
Expand Down Expand Up @@ -86,6 +87,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
_, err = r.db.ExecContext(ctx, query,
t.ID, t.Name, uint8(t.State),
nullString(t.ImageURL),
nullString(t.WasmHTTPURL),
t.File,
cliArgs,
inputs,
Expand Down Expand Up @@ -131,11 +133,11 @@ func (r *taskRepo) Get(ctx context.Context, id string) (task.Task, error) {

func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
query := `UPDATE tasks SET
name = $2, state = $3, image_url = $4, file = $5, cli_args = $6, inputs = $7,
env = $8, daemon = $9, encrypted = $10, kbs_resource_path = $11, proplet_id = $12,
results = $13, error = $14, monitoring_profile = $15, start_time = $16,
finish_time = $17, updated_at = $18, workflow_id = $19, job_id = $20,
depends_on = $21, run_if = $22, kind = $23, mode = $24
name = $2, state = $3, image_url = $4, wasm_http_url = $5, file = $6, cli_args = $7, inputs = $8,
env = $9, daemon = $10, encrypted = $11, kbs_resource_path = $12, proplet_id = $13,
results = $14, error = $15, monitoring_profile = $16, start_time = $17,
finish_time = $18, updated_at = $19, workflow_id = $20, job_id = $21,
depends_on = $22, run_if = $23, kind = $24, mode = $25
WHERE id = $1`

cliArgs, err := jsonBytes(t.CLIArgs)
Expand Down Expand Up @@ -171,6 +173,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
_, err = r.db.ExecContext(ctx, query,
t.ID, t.Name, uint8(t.State),
nullString(t.ImageURL),
nullString(t.WasmHTTPURL),
t.File,
cliArgs, inputs, env,
t.Daemon, t.Encrypted,
Expand Down Expand Up @@ -246,7 +249,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([]
for rows.Next() {
var dbt dbTask
if err := rows.Scan(
&dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL,
&dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.WasmHTTPURL,
&dbt.File, &dbt.CLIArgs, &dbt.Inputs, &dbt.Env,
&dbt.Daemon, &dbt.Encrypted, &dbt.KBSResourcePath, &dbt.PropletID,
&dbt.Results, &dbt.Error, &dbt.MonitoringProfile,
Expand Down Expand Up @@ -287,6 +290,9 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) {
if dbt.ImageURL != nil {
t.ImageURL = *dbt.ImageURL
}
if dbt.WasmHTTPURL != nil {
t.WasmHTTPURL = *dbt.WasmHTTPURL
}
if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil {
return task.Task{}, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/sqlite/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func (db *Database) Migrate() error {
`ALTER TABLE proplets DROP COLUMN metadata`,
},
},
{
Id: "4_add_wasm_http_url",
Up: []string{
`ALTER TABLE tasks ADD COLUMN wasm_http_url TEXT`,
},
Down: []string{
`ALTER TABLE tasks DROP COLUMN wasm_http_url`,
},
},
},
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/storage/sqlite/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type dbTask struct {
Name string `db:"name"`
State uint8 `db:"state"`
ImageURL *string `db:"image_url"`
WasmHTTPURL *string `db:"wasm_http_url"`
File []byte `db:"file"`
CLIArgs []byte `db:"cli_args"`
Inputs []byte `db:"inputs"`
Expand All @@ -47,13 +48,13 @@ type dbTask struct {
Mode *string `db:"mode"`
}

const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted,
const taskColumns = `id, name, state, image_url, wasm_http_url, file, cli_args, inputs, env, daemon, encrypted,
kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time,
created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode`

func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
query := `INSERT INTO tasks (` + taskColumns + `)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

cliArgs, err := jsonBytes(t.CLIArgs)
if err != nil {
Expand Down Expand Up @@ -86,7 +87,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
}

_, err = r.db.ExecContext(ctx, query,
t.ID, t.Name, uint8(t.State), nullString(t.ImageURL),
t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), nullString(t.WasmHTTPURL),
t.File, cliArgs, inputs, env,
t.Daemon, t.Encrypted, nullString(t.KBSResourcePath),
nullString(t.PropletID),
Expand Down Expand Up @@ -122,7 +123,7 @@ func (r *taskRepo) Get(ctx context.Context, id string) (task.Task, error) {

func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
query := `UPDATE tasks SET
name = ?, state = ?, image_url = ?, file = ?, cli_args = ?, inputs = ?,
name = ?, state = ?, image_url = ?, wasm_http_url = ?, file = ?, cli_args = ?, inputs = ?,
env = ?, daemon = ?, encrypted = ?, kbs_resource_path = ?, proplet_id = ?,
results = ?, error = ?, monitoring_profile = ?, start_time = ?,
finish_time = ?, updated_at = ?, workflow_id = ?, job_id = ?,
Expand Down Expand Up @@ -160,7 +161,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
}

_, err = r.db.ExecContext(ctx, query,
t.Name, uint8(t.State), nullString(t.ImageURL),
t.Name, uint8(t.State), nullString(t.ImageURL), nullString(t.WasmHTTPURL),
t.File, cliArgs, inputs, env,
t.Daemon, t.Encrypted, nullString(t.KBSResourcePath),
nullString(t.PropletID),
Expand Down Expand Up @@ -228,7 +229,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([]
for rows.Next() {
var dbt dbTask
if err := rows.Scan(
&dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL,
&dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.WasmHTTPURL,
&dbt.File, &dbt.CLIArgs, &dbt.Inputs, &dbt.Env,
&dbt.Daemon, &dbt.Encrypted, &dbt.KBSResourcePath, &dbt.PropletID,
&dbt.Results, &dbt.Error, &dbt.MonitoringProfile,
Expand Down Expand Up @@ -269,6 +270,9 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) {
if dbt.ImageURL != nil {
t.ImageURL = *dbt.ImageURL
}
if dbt.WasmHTTPURL != nil {
t.WasmHTTPURL = *dbt.WasmHTTPURL
}
if dbt.CLIArgs != nil {
if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil {
return task.Task{}, err
Expand Down
59 changes: 59 additions & 0 deletions pkg/storage/sqlite/tasks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package sqlite_test

import (
"context"
"testing"
"time"

"github.com/absmach/propeller/pkg/storage/sqlite"
"github.com/absmach/propeller/pkg/task"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTaskCreate_WithWasmHTTPURL(t *testing.T) {
t.Parallel()
repo := sqlite.NewTaskRepository(newTestDB(t))

url := "http://example.com/module.wasm"
tk := task.Task{
ID: uuid.NewString(),
Name: "http-wasm-task",
State: task.Pending,
WasmHTTPURL: url,
CreatedAt: time.Now().UTC().Truncate(time.Second),
UpdatedAt: time.Now().UTC().Truncate(time.Second),
}

created, err := repo.Create(context.Background(), tk)
require.NoError(t, err)
assert.Equal(t, tk.ID, created.ID)
assert.Equal(t, url, created.WasmHTTPURL)

fetched, err := repo.Get(context.Background(), tk.ID)
require.NoError(t, err)
assert.Equal(t, url, fetched.WasmHTTPURL)
}

func TestTaskCreate_WasmHTTPURLNull(t *testing.T) {
t.Parallel()
repo := sqlite.NewTaskRepository(newTestDB(t))

tk := task.Task{
ID: uuid.NewString(),
Name: "no-http-url-task",
State: task.Pending,
ImageURL: "oci://example.com/image:latest",
CreatedAt: time.Now().UTC().Truncate(time.Second),
UpdatedAt: time.Now().UTC().Truncate(time.Second),
}

created, err := repo.Create(context.Background(), tk)
require.NoError(t, err)
assert.Empty(t, created.WasmHTTPURL)

fetched, err := repo.Get(context.Background(), tk.ID)
require.NoError(t, err)
assert.Empty(t, fetched.WasmHTTPURL)
}
1 change: 1 addition & 0 deletions pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Task struct {
Kind TaskKind `json:"kind,omitempty"`
State State `json:"state"`
ImageURL string `json:"image_url,omitempty"`
WasmHTTPURL string `json:"wasm_http_url,omitempty"`
File []byte `json:"file,omitempty"`
CLIArgs []string `json:"cli_args"`
Inputs []uint64 `json:"inputs,omitempty"`
Expand Down
68 changes: 68 additions & 0 deletions proplet/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading