Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions pkg/core/inspection/taskbase/inventory_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Inventory related tasks defined in this file provides a framework for discovering and merging inventory data from various sources.
//
// In many inspection scenarios, it's necessary to associate information across different log sources.
// For example, a log might contain an IP address, while another log maps that IP to a specific VM or container name.
// However, the availability of these log sources is not always guaranteed, and consumers of this inventory
// data should not need to be aware of the specific tasks that provide it.
//
// This framework introduces two main components to address this:
//
// 1. DiscoveryTask: A task responsible for extracting a inventory map from a single data source.
// Providers of a discovery task must ensure it is added to the task graph when a task that may require its
// data is included. This is achieved by using the coretask.NewSubsequentTaskRefsTaskLabel, which links the
// discovery task to the merger task.
//
// 2. InventoryTask: A task that aggregates the results from all relevant DiscoveryTasks.
// Consumers can simply depend on this single merger task to access the complete, consolidated inventory map
// without needing to know about the individual discovery tasks.
//
// This approach decouples data consumers from data providers, allowing for a flexible and extensible inspection system.
package inspectiontaskbase

import (
"context"
"log/slog"
"sync"

inspectionmetadata "github.com/GoogleCloudPlatform/khi/pkg/core/inspection/metadata"
coretask "github.com/GoogleCloudPlatform/khi/pkg/core/task"
"github.com/GoogleCloudPlatform/khi/pkg/core/task/taskid"
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
)

// InventoryTaskBuilder builds a inventory task and discovery tasks.
// Inventory task merges information found in logs from multiple discovery tasks.
type InventoryTaskBuilder[T any] struct {
mu sync.Mutex
inventoryTaskID taskid.TaskImplementationID[T]
discoveryTaskRefs []taskid.TaskReference[T]
}

func NewInventoryTaskBuilder[T any](inventoryTaskID taskid.TaskImplementationID[T]) *InventoryTaskBuilder[T] {
return &InventoryTaskBuilder[T]{
inventoryTaskID: inventoryTaskID,
}
}

func (s *InventoryTaskBuilder[T]) InventoryTask(strategy InventoryMergerStrategy[T]) coretask.Task[T] {
s.mu.Lock()
defer s.mu.Unlock()
return NewInspectionTask(s.inventoryTaskID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (T, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
return *new(T), nil
}
discoveryResults := make([]T, 0, len(s.discoveryTaskRefs))
for _, ref := range s.discoveryTaskRefs {
r, found := coretask.GetTaskResultOptional(ctx, ref)
if found {
discoveryResults = append(discoveryResults, r)
} else {
slog.DebugContext(ctx, "discovery result not provided", "taskRef", ref.ReferenceIDString())
}
}
return strategy.Merge(discoveryResults)
})
}

func (s *InventoryTaskBuilder[T]) DiscoveryTask(taskID taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T] {
s.mu.Lock()
defer s.mu.Unlock()
inventoryTaskID := s.inventoryTaskID.Ref()
labelOpts = append(labelOpts, coretask.NewSubsequentTaskRefsTaskLabel(inventoryTaskID))

found := false
for _, ref := range s.discoveryTaskRefs {
if ref.ReferenceIDString() == taskID.ReferenceIDString() {
found = true
break
}
}
if !found {
s.discoveryTaskRefs = append(s.discoveryTaskRefs, taskID.Ref())
}
Comment on lines +87 to +96

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The deduplication logic for discovery tasks in the DiscoveryTask method can lead to incorrect behavior by silently ignoring some tasks. The check ref.ReferenceIDString() == taskID.ReferenceIDString() prevents registration of multiple discovery tasks that share the same base reference ID, even if they have different implementation hashes (e.g., my-discovery#impl1 and my-discovery#impl2). As a result, the InventoryTask will only be aware of the first registered task and won't aggregate results from all intended discovery tasks.

This contradicts the goal of aggregating results from all relevant discovery tasks. A possible fix would involve tracking each unique task implementation, but this may be constrained by the current implementation of coretask.GetTaskResultOptional.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine because a task graph won't include multiple implementations of a task after the resolution.


return NewProgressReportableInspectionTask(taskID, dependencies, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (T, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
return *new(T), nil
}
return taskFunc(ctx, taskMode, progress)
}, labelOpts...)
}

// InventoryMergerStrategy defines the strategy how the generated InventoryMergerTasks merges results received from multiple discovery tasks.
type InventoryMergerStrategy[T any] interface {

// Merge defines the logic to combine multiple results from various discovery tasks
// into a single, consolidated result.
Merge(results []T) (T, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,11 @@ import (
"github.com/google/go-cmp/cmp"
)

type testSimpleStringRelationshipMergerTaskSetting struct {
idSource *RelationshipTaskIDSource[map[string]struct{}]
type testSimpleStringMergerStrategy struct {
}

// IDSource implements RelationshipMergerTaskSetting.
func (t *testSimpleStringRelationshipMergerTaskSetting) IDSource() *RelationshipTaskIDSource[map[string]struct{}] {
return t.idSource
}

// Merge implements RelationshipMergerTaskSetting.
func (t *testSimpleStringRelationshipMergerTaskSetting) Merge(results []map[string]struct{}) (map[string]struct{}, error) {
// Merge implements InventoryMergerTaskSetting.
func (t *testSimpleStringMergerStrategy) Merge(results []map[string]struct{}) (map[string]struct{}, error) {
result := make(map[string]struct{})
for _, r := range results {
for k := range r {
Expand All @@ -46,52 +40,31 @@ func (t *testSimpleStringRelationshipMergerTaskSetting) Merge(results []map[stri
return result, nil
}

var _ RelationshipMergerTaskSetting[map[string]struct{}] = (*testSimpleStringRelationshipMergerTaskSetting)(nil)

// TestRelationshipTaskIDSource tests the basic functionality of the RelationshipTaskIDSource.
// It verifies that:
// - GenerateDefaultRelationshipDiscoveryTaskID correctly creates a task ID.
// - Attempting to generate a task ID with the same reference string multiple times does not result in duplicate entries.
func TestRelationshipTaskIDSource(t *testing.T) {
mergerTaskID := taskid.NewDefaultImplementationID[struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)

wantID := taskid.NewDefaultImplementationID[struct{}]("discovery")
gotID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery")

if diff := cmp.Diff(wantID.String(), gotID.String()); diff != "" {
t.Errorf("GenerateDefaultRelationshipDiscoveryTaskID() mismatch (-want +got):\n%s", diff)
}

ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery")
if len(ids.discoveryTaskRefs) != 1 {
t.Errorf("Expected discoveryTaskRefs to contain 1 element after generating the same ID twice, got %d", len(ids.discoveryTaskRefs))
}
}
var _ InventoryMergerStrategy[map[string]struct{}] = (*testSimpleStringMergerStrategy)(nil)

// TestRelationshipTask_ProvidedFromSingleDiscoveryTask tests a scenario where the merger task
// TestInventoryTask_ProvidedFromSingleDiscoveryTask tests a scenario where the merger task
// receives data from only one of two available discovery tasks.
// This is because the main user task only depends on the parent of the first discovery task.
// The test verifies that only the result from the first discovery task ("foo") is present in the final merged map and the task dependency topology doesn't add the discovery-2 task not intentionally.
func TestRelationshipTask_ProvidedFromSingleDiscoveryTask(t *testing.T) {
func TestInventoryTask_ProvidedFromSingleDiscoveryTask(t *testing.T) {
nop := func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) {
return struct{}{}, nil
}
mergerTaskID := taskid.NewDefaultImplementationID[map[string]struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)
mergerTask := NewRelationshipMergerTask(&testSimpleStringRelationshipMergerTaskSetting{ids})
discovery1ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-1")
discovery2ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-2")
builder := NewInventoryTaskBuilder(mergerTaskID)
mergerTask := builder.InventoryTask(&testSimpleStringMergerStrategy{})
discovery1ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery1")
discovery2ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery2")
discovery1ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-1-parent")
discovery1ParentTask := NewInspectionTask(discovery1ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery1ID.Ref()))
discovery1 := NewRelationshipDiscoveryTask(discovery1ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery1 := builder.DiscoveryTask(discovery1ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"foo": {},
}, nil
})
discovery2ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-2-parent")
discovery2ParentTask := NewInspectionTask(discovery2ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery2ID.Ref()))
discovery2 := NewRelationshipDiscoveryTask(discovery2ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery2 := builder.DiscoveryTask(discovery2ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"bar": {},
}, nil
Expand All @@ -115,29 +88,29 @@ func TestRelationshipTask_ProvidedFromSingleDiscoveryTask(t *testing.T) {
}
}

// TestRelationshipTask_ProvidedFromMultipleDiscoveryTask tests a scenario where the merger task
// TestInventoryTask_ProvidedFromMultipleDiscoveryTask tests a scenario where the merger task
// receives and merges data from multiple discovery tasks.
// This is because the main user task depends on the parents of both discovery tasks.
// The test verifies that the results from both discovery tasks ("foo" and "bar") are present in the final merged map.
func TestRelationshipTask_ProvidedFromMultipleDiscoveryTask(t *testing.T) {
func TestInventoryTask_ProvidedFromMultipleDiscoveryTask(t *testing.T) {
nop := func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) {
return struct{}{}, nil
}
mergerTaskID := taskid.NewDefaultImplementationID[map[string]struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)
mergerTask := NewRelationshipMergerTask(&testSimpleStringRelationshipMergerTaskSetting{ids})
discovery1ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-1")
discovery2ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-2")
builder := NewInventoryTaskBuilder(mergerTaskID)
mergerTask := builder.InventoryTask(&testSimpleStringMergerStrategy{})
discovery1ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery1")
discovery2ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery2")
discovery1ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-1-parent")
discovery1ParentTask := NewInspectionTask(discovery1ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery1ID.Ref()))
discovery1 := NewRelationshipDiscoveryTask(discovery1ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery1 := builder.DiscoveryTask(discovery1ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"foo": {},
}, nil
})
discovery2ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-2-parent")
discovery2ParentTask := NewInspectionTask(discovery2ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery2ID.Ref()))
discovery2 := NewRelationshipDiscoveryTask(discovery2ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery2 := builder.DiscoveryTask(discovery2ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"bar": {},
}, nil
Expand All @@ -162,28 +135,28 @@ func TestRelationshipTask_ProvidedFromMultipleDiscoveryTask(t *testing.T) {
}
}

// TestRelationshipTask_ProvidedFromNoDiscoveryTask tests a scenario where the merger task receives no data.
// TestInventoryTask_ProvidedFromNoDiscoveryTask tests a scenario where the merger task receives no data.
// This is because the main user task does not depend on any of the discovery tasks' parents.
// The test verifies that the final merged map is empty.
func TestRelationshipTask_ProvidedFromNoDiscoveryTask(t *testing.T) {
func TestInventoryTask_ProvidedFromNoDiscoveryTask(t *testing.T) {
nop := func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) {
return struct{}{}, nil
}
mergerTaskID := taskid.NewDefaultImplementationID[map[string]struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)
mergerTask := NewRelationshipMergerTask(&testSimpleStringRelationshipMergerTaskSetting{ids})
discovery1ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-1")
discovery2ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-2")
builder := NewInventoryTaskBuilder(mergerTaskID)
mergerTask := builder.InventoryTask(&testSimpleStringMergerStrategy{})
discovery1ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery1")
discovery2ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery2")
discovery1ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-1-parent")
discovery1ParentTask := NewInspectionTask(discovery1ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery1ID.Ref()))
discovery1 := NewRelationshipDiscoveryTask(discovery1ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery1 := builder.DiscoveryTask(discovery1ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"foo": {},
}, nil
})
discovery2ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-2-parent")
discovery2ParentTask := NewInspectionTask(discovery2ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery2ID.Ref()))
discovery2 := NewRelationshipDiscoveryTask(discovery2ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery2 := builder.DiscoveryTask(discovery2ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"bar": {},
}, nil
Expand Down
Loading
Loading