Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 90 additions & 42 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (
"github.com/GoogleCloudPlatform/scion/pkg/agent"
"github.com/GoogleCloudPlatform/scion/pkg/agent/state"
"github.com/GoogleCloudPlatform/scion/pkg/api"
"github.com/GoogleCloudPlatform/scion/pkg/broker"
"github.com/GoogleCloudPlatform/scion/pkg/apiclient"
"github.com/GoogleCloudPlatform/scion/pkg/broker"
"github.com/GoogleCloudPlatform/scion/pkg/brokercredentials"
"github.com/GoogleCloudPlatform/scion/pkg/config"
"github.com/GoogleCloudPlatform/scion/pkg/daemon"
Expand Down Expand Up @@ -1843,46 +1843,15 @@ func newAgentDispatcherAdapter(mgr agent.Manager, s store.Store, brokerID string
// DispatchAgentCreate implements hub.AgentDispatcher.
// It starts the agent on the runtime broker and updates the hub store with runtime info.
func (d *agentDispatcherAdapter) DispatchAgentCreate(ctx context.Context, hubAgent *store.Agent) error {
// Look up the local path for this grove on this runtime broker
var grovePath string
if hubAgent.GroveID != "" && d.brokerID != "" {
provider, err := d.store.GetGroveProvider(ctx, hubAgent.GroveID, d.brokerID)
if err != nil {
log.Printf("Warning: failed to get grove provider for path lookup: %v", err)
} else if provider.LocalPath != "" {
grovePath = provider.LocalPath
}
}

// Build StartOptions from the hub agent record
env := make(map[string]string)
if hubAgent.AppliedConfig != nil && hubAgent.AppliedConfig.Env != nil {
env = hubAgent.AppliedConfig.Env
}
grovePath := d.resolveGrovePath(ctx, hubAgent.GroveID)
opts := d.buildStartOptions(hubAgent, grovePath, false)

// Add grove ID label for tracking
// Ensure grove ID label is present for tracking
if hubAgent.Labels == nil {
hubAgent.Labels = make(map[string]string)
}
hubAgent.Labels["scion.grove"] = hubAgent.GroveID

opts := api.StartOptions{
Name: hubAgent.Name,
Template: hubAgent.Template,
Image: hubAgent.Image,
Env: env,
Detached: &hubAgent.Detached,
GrovePath: grovePath, // Pass the local filesystem path for this grove
}

if hubAgent.AppliedConfig != nil {
opts.HarnessConfig = hubAgent.AppliedConfig.HarnessConfig
// Pass the task through to the runtime broker
if hubAgent.AppliedConfig.Task != "" {
opts.Task = hubAgent.AppliedConfig.Task
}
}

// Start the agent on the runtime broker
agentInfo, err := d.manager.Start(ctx, opts)
if err != nil {
Expand All @@ -1907,10 +1876,36 @@ func (d *agentDispatcherAdapter) DispatchAgentCreate(ctx context.Context, hubAge
// DispatchAgentStart implements hub.AgentDispatcher.
// For co-located runtime brokers, this resumes a stopped agent.
func (d *agentDispatcherAdapter) DispatchAgentStart(ctx context.Context, hubAgent *store.Agent, task string) error {
// For now, starting an existing agent is not fully supported in the manager
// The manager's Start method creates new agents, not resumes existing ones
// TODO: Implement proper agent resume functionality in the manager
log.Printf("DispatchAgentStart called for agent %s (not fully implemented)", hubAgent.Name)
grovePath := d.resolveGrovePath(ctx, hubAgent.GroveID)
opts := d.buildStartOptions(hubAgent, grovePath, true)

// Ensure grove ID label is present for tracking
if hubAgent.Labels == nil {
hubAgent.Labels = make(map[string]string)
}
hubAgent.Labels["scion.grove"] = hubAgent.GroveID
if task != "" {
opts.Task = task
}

// Start the agent on the runtime broker
agentInfo, err := d.manager.Start(ctx, opts)
if err != nil {
return fmt.Errorf("failed to start agent: %w", err)
}

// Update the hub agent record with runtime information
hubAgent.Phase = string(state.PhaseRunning)
hubAgent.ContainerStatus = agentInfo.ContainerStatus
if agentInfo.ID != "" {
hubAgent.RuntimeState = "container:" + agentInfo.ID
}
hubAgent.LastSeen = time.Now()

if err := d.store.UpdateAgent(ctx, hubAgent); err != nil {
log.Printf("Warning: failed to update agent with runtime info: %v", err)
}

return nil
}

Expand Down Expand Up @@ -1940,18 +1935,71 @@ func (d *agentDispatcherAdapter) DispatchAgentRestart(ctx context.Context, hubAg
log.Printf("Warning: failed to stop agent during restart: %v", err)
}

// TODO: Implement proper restart with start after stop
// For now, just update phase
grovePath := d.resolveGrovePath(ctx, hubAgent.GroveID)
opts := d.buildStartOptions(hubAgent, grovePath, true)

// Ensure grove ID label is present for tracking
if hubAgent.Labels == nil {
hubAgent.Labels = make(map[string]string)
}
hubAgent.Labels["scion.grove"] = hubAgent.GroveID

agentInfo, err := d.manager.Start(ctx, opts)
if err != nil {
return fmt.Errorf("failed to restart agent: %w", err)
}

hubAgent.Phase = string(state.PhaseRunning)
hubAgent.ContainerStatus = agentInfo.ContainerStatus
if agentInfo.ID != "" {
hubAgent.RuntimeState = "container:" + agentInfo.ID
}
hubAgent.LastSeen = time.Now()

if err := d.store.UpdateAgent(ctx, hubAgent); err != nil {
log.Printf("Warning: failed to update agent status: %v", err)
log.Printf("Warning: failed to update agent with runtime info: %v", err)
}

return nil
}

func (d *agentDispatcherAdapter) buildStartOptions(hubAgent *store.Agent, grovePath string, resume bool) api.StartOptions {
// Build StartOptions from the hub agent record
env := make(map[string]string)
if hubAgent.AppliedConfig != nil && hubAgent.AppliedConfig.Env != nil {
env = hubAgent.AppliedConfig.Env
}

opts := api.StartOptions{
Name: hubAgent.Name,
Template: hubAgent.Template,
Image: hubAgent.Image,
Env: env,
Detached: &hubAgent.Detached,
GrovePath: grovePath,
Resume: resume,
}

if hubAgent.AppliedConfig != nil {
opts.HarnessConfig = hubAgent.AppliedConfig.HarnessConfig
if hubAgent.AppliedConfig.Task != "" {
opts.Task = hubAgent.AppliedConfig.Task
}
}
return opts
}
func (d *agentDispatcherAdapter) resolveGrovePath(ctx context.Context, groveID string) string {
if groveID == "" || d.brokerID == "" {
return ""
}
provider, err := d.store.GetGroveProvider(ctx, groveID, d.brokerID)
if err != nil {
log.Printf("Warning: failed to get grove provider for path lookup: %v", err)
return ""
}
return provider.LocalPath
}

// DispatchAgentDelete implements hub.AgentDispatcher.
// It removes an agent from the runtime broker.
func (d *agentDispatcherAdapter) DispatchAgentDelete(ctx context.Context, hubAgent *store.Agent, deleteFiles, removeBranch, _ bool, _ time.Time) error {
Expand Down
184 changes: 184 additions & 0 deletions cmd/server_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !no_sqlite

package cmd

import (
"context"
"testing"

"github.com/GoogleCloudPlatform/scion/pkg/api"
"github.com/GoogleCloudPlatform/scion/pkg/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type mockAgentManager struct {
startOpts api.StartOptions
startCalls int
stopCalls int
stopAgent string
}

func (m *mockAgentManager) Provision(ctx context.Context, opts api.StartOptions) (*api.ScionConfig, error) {
return nil, nil
}

func (m *mockAgentManager) Start(ctx context.Context, opts api.StartOptions) (*api.AgentInfo, error) {
m.startCalls++
m.startOpts = opts
return &api.AgentInfo{
ID: "container-123",
Name: opts.Name,
ContainerStatus: "running",
}, nil
}

func (m *mockAgentManager) Stop(ctx context.Context, agentID string) error {
m.stopCalls++
m.stopAgent = agentID
return nil
}

func (m *mockAgentManager) Delete(ctx context.Context, agentID string, deleteFiles bool, grovePath string, removeBranch bool) (bool, error) {
return true, nil
}

func (m *mockAgentManager) List(ctx context.Context, filter map[string]string) ([]api.AgentInfo, error) {
return nil, nil
}

func (m *mockAgentManager) Message(ctx context.Context, agentID string, message string, interrupt bool) error {
return nil
}

func (m *mockAgentManager) Watch(ctx context.Context, agentID string) (<-chan api.StatusEvent, error) {
return nil, nil
}

func TestDispatchAgentStart(t *testing.T) {
ctx := context.Background()
s := newTestStore(t)
mgr := &mockAgentManager{}
brokerID := "test-broker"

adapter := newAgentDispatcherAdapter(mgr, s, brokerID)

// Create test grove and broker
grove := &store.Grove{
ID: "grove-1",
Slug: "test-grove",
Name: "Test Grove",
}
err := s.CreateGrove(ctx, grove)
require.NoError(t, err)

broker := &store.RuntimeBroker{
ID: brokerID,
Name: "test-broker",
}
err = s.CreateRuntimeBroker(ctx, broker)
require.NoError(t, err)

provider := &store.GroveProvider{
GroveID: grove.ID,
BrokerID: brokerID,
LocalPath: "/tmp/fake/grove",
}
err = s.AddGroveProvider(ctx, provider)
require.NoError(t, err)

// Create agent
agent := &store.Agent{
ID: "agent-1",
Slug: "test-agent",
Name: "test-agent",
GroveID: grove.ID,
Template: "gemini",
Image: "test-image",
Detached: true,
AppliedConfig: &store.AgentAppliedConfig{
Env: map[string]string{"FOO": "BAR"},
Task: "original task",
},
}
err = s.CreateAgent(ctx, agent)
require.NoError(t, err)

// Test DispatchAgentStart
err = adapter.DispatchAgentStart(ctx, agent, "new task")
require.NoError(t, err)

// Verify manager calls
assert.Equal(t, 1, mgr.startCalls)
assert.Equal(t, "test-agent", mgr.startOpts.Name)
assert.Equal(t, true, mgr.startOpts.Resume)
assert.Equal(t, "new task", mgr.startOpts.Task)
assert.Equal(t, "/tmp/fake/grove", mgr.startOpts.GrovePath)
assert.Equal(t, "gemini", mgr.startOpts.Template)
assert.Equal(t, "BAR", mgr.startOpts.Env["FOO"])

// Verify agent update
updatedAgent, err := s.GetAgent(ctx, agent.ID)
require.NoError(t, err)
assert.Equal(t, "running", updatedAgent.Phase)
assert.Equal(t, "running", updatedAgent.ContainerStatus)
assert.Equal(t, "container:container-123", updatedAgent.RuntimeState)
}

func TestDispatchAgentRestart(t *testing.T) {
ctx := context.Background()
s := newTestStore(t)
mgr := &mockAgentManager{}
brokerID := "test-broker"

adapter := newAgentDispatcherAdapter(mgr, s, brokerID)

// Create test grove and agent
grove := &store.Grove{
ID: "grove-1",
Slug: "test-grove",
Name: "Test Grove",
}
err := s.CreateGrove(ctx, grove)
require.NoError(t, err)

agent := &store.Agent{
ID: "agent-1",
Slug: "test-agent",
Name: "test-agent",
GroveID: grove.ID,
}
err = s.CreateAgent(ctx, agent)
require.NoError(t, err)

// Test DispatchAgentRestart
err = adapter.DispatchAgentRestart(ctx, agent)
require.NoError(t, err)

// Verify manager calls
assert.Equal(t, 1, mgr.stopCalls)
assert.Equal(t, "test-agent", mgr.stopAgent)

assert.Equal(t, 1, mgr.startCalls)
assert.Equal(t, "test-agent", mgr.startOpts.Name)
assert.Equal(t, true, mgr.startOpts.Resume) // Restart acts like a resume

// Verify agent update
updatedAgent, err := s.GetAgent(ctx, agent.ID)
require.NoError(t, err)
assert.Equal(t, "running", updatedAgent.Phase)
}
Loading