Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
98ad815
feat: Dynamically generate Cloud Logging resource name input forms ba…
kyasbal Nov 25, 2025
4767e6a
Adding several minor changes to improve testability before migrating …
kyasbal Nov 26, 2025
32d2b0e
Adding tasks for log summary generation and history modifier for erro…
kyasbal Nov 26, 2025
02ab84b
Adding grouping related tasks and tasks for gathering k8s audit logs …
kyasbal Nov 27, 2025
c4e02f8
Adding tasks related to merge manifests from audit logs (#379)
kyasbal Nov 27, 2025
57625a0
Adding testchangeset utility (#382)
kyasbal Dec 2, 2025
0944747
Fix flaky test on commonk8slogaudit tasks (#388)
kyasbal Dec 2, 2025
eae4248
Add task IDs, types used in task output, new revision state types and…
kyasbal Dec 2, 2025
0356c3c
Revision sort criteria wasn't right (#386)
kyasbal Dec 2, 2025
1353f50
Added comments on existing task types (#387)
kyasbal Dec 2, 2025
afba83c
Adding the basic revision recorder for k8s audit log (#389)
kyasbal Dec 2, 2025
1d80473
Migrate pseudo subresource recorder in the previous k8s audit log par…
kyasbal Dec 2, 2025
f1d6573
Adding new PodPhase recorder and register all defined tasks for k8s a…
kyasbal Dec 2, 2025
665016f
Remove unused legacy parsers and migrate OSS log parsers to depend on…
kyasbal Dec 2, 2025
0436439
Refactor inventory task base types (#394)
kyasbal Dec 3, 2025
9742f71
Adding node name inventory and refactored resource grouping logic (#395)
kyasbal Dec 3, 2025
cf43350
Improved container ID discovery tasks and implemented pod uid discove…
kyasbal Dec 3, 2025
961408c
Adding inventory tasks for IP leasing history and NEG names (#397)
kyasbal Dec 4, 2025
0b85a6e
bug: deletionGracePeriodSeconds=0 was always treated as completely re…
kyasbal Dec 4, 2025
2fd2ec6
Merge branch 'main' into epic/issue-373
kyasbal Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/common/patternfinder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package patternfinder

import "fmt"

// PatternMatchResult represents a single match found within a larger text.
// It includes the start and end positions of the match.
type PatternMatchResult[T any] struct {
Expand All @@ -22,6 +24,14 @@ type PatternMatchResult[T any] struct {
End int
}

// GetMatchedString extracts the matched string from the original string.
func (p *PatternMatchResult[T]) GetMatchedString(original string) (string, error) {
if p.Start < 0 || p.End > len(original) {
return "", fmt.Errorf("invalid match range: start=%d, end=%d", p.Start, p.End)
}
return original[p.Start:p.End], nil
}

// FindAllWithStarterRunes finds all occurrences of patterns within a search text.
// The search for a pattern only begins after encountering one of the specified starterRunes.
//
Expand Down
46 changes: 46 additions & 0 deletions pkg/common/patternfinder/finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,52 @@ func TestFindAllWithStarterRunes(t *testing.T) {
}
}

func TestGetMatchedString(t *testing.T) {
testCases := []struct {
desc string
result PatternMatchResult[struct{}]
original string
want string
wantErr bool
}{
{
desc: "simple case",
result: PatternMatchResult[struct{}]{
Value: struct{}{},
Start: 0,
End: 5,
},
original: "hello",
want: "hello",
},
{
desc: "original message has short length than result",
result: PatternMatchResult[struct{}]{
Value: struct{}{},
Start: 0,
End: 5,
},
original: "",
want: "",
wantErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
got, err := tc.result.GetMatchedString(tc.original)
if tc.wantErr {
if err == nil {
t.Errorf("GetMatchedString() = %v, want error %v", got, tc.want)
}
} else {
if got != tc.want {
t.Errorf("GetMatchedString() = %v, want %v", got, tc.want)
}
}
})
}
}

func BenchmarkFindAllWithStarterRunes(b *testing.B) {
finders := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/inspection/taskbase/fieldsetread_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// to each log concurrently. This allows for parallel processing of log entries to extract specific fields needed in later tasks.
// Later parser tasks usually process logs from older to newer with grouped by resource, thus it can't be done in parallel.
// The process of extracting log fields must not depend on the other logs and it can be done in parallel.
func NewFieldSetReadTask(taskId taskid.TaskImplementationID[[]*log.Log], logTask taskid.TaskReference[[]*log.Log], fieldSetReaders []log.FieldSetReader) coretask.Task[[]*log.Log] {
func NewFieldSetReadTask(taskId taskid.TaskImplementationID[[]*log.Log], logTask taskid.TaskReference[[]*log.Log], fieldSetReaders []log.FieldSetReader, labelOpts ...coretask.LabelOpt) coretask.Task[[]*log.Log] {
return NewProgressReportableInspectionTask(taskId, []taskid.UntypedTaskReference{
logTask,
}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) ([]*log.Log, error) {
Expand Down Expand Up @@ -83,5 +83,5 @@ func NewFieldSetReadTask(taskId taskid.TaskImplementationID[[]*log.Log], logTask
}

return logs, nil
})
}, labelOpts...)
}
114 changes: 114 additions & 0 deletions pkg/core/inspection/taskbase/inventory_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Inventory related tasks defined in this file provides a framework for discovering and merging inventory data from various sources.
//
// In many inspection scenarios, it's necessary to associate information across different log sources.
// For example, a log might contain an IP address, while another log maps that IP to a specific VM or container name.
// However, the availability of these log sources is not always guaranteed, and consumers of this inventory
// data should not need to be aware of the specific tasks that provide it.
//
// This framework introduces two main components to address this:
//
// 1. DiscoveryTask: A task responsible for extracting a inventory map from a single data source.
// Providers of a discovery task must ensure it is added to the task graph when a task that may require its
// data is included. This is achieved by using the coretask.NewSubsequentTaskRefsTaskLabel, which links the
// discovery task to the merger task.
//
// 2. InventoryTask: A task that aggregates the results from all relevant DiscoveryTasks.
// Consumers can simply depend on this single merger task to access the complete, consolidated inventory map
// without needing to know about the individual discovery tasks.
//
// This approach decouples data consumers from data providers, allowing for a flexible and extensible inspection system.
package inspectiontaskbase

import (
"context"
"log/slog"
"sync"

inspectionmetadata "github.com/GoogleCloudPlatform/khi/pkg/core/inspection/metadata"
coretask "github.com/GoogleCloudPlatform/khi/pkg/core/task"
"github.com/GoogleCloudPlatform/khi/pkg/core/task/taskid"
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
)

// InventoryTaskBuilder builds a inventory task and discovery tasks.
// Inventory task merges information found in logs from multiple discovery tasks.
type InventoryTaskBuilder[T any] struct {
mu sync.Mutex
inventoryTaskID taskid.TaskImplementationID[T]
discoveryTaskRefs []taskid.TaskReference[T]
}

func NewInventoryTaskBuilder[T any](inventoryTaskID taskid.TaskImplementationID[T]) *InventoryTaskBuilder[T] {
return &InventoryTaskBuilder[T]{
inventoryTaskID: inventoryTaskID,
}
}

// InventoryTask builds a inventory task with given merger strategy.
func (s *InventoryTaskBuilder[T]) InventoryTask(strategy InventoryMergerStrategy[T]) coretask.Task[T] {
s.mu.Lock()
defer s.mu.Unlock()
return NewInspectionTask(s.inventoryTaskID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (T, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
return *new(T), nil
}
discoveryResults := make([]T, 0, len(s.discoveryTaskRefs))
for _, ref := range s.discoveryTaskRefs {
r, found := coretask.GetTaskResultOptional(ctx, ref)
if found {
discoveryResults = append(discoveryResults, r)
} else {
slog.DebugContext(ctx, "discovery result not provided", "taskRef", ref.ReferenceIDString())
}
}
return strategy.Merge(discoveryResults)
})
}

// DiscoveryTask builds a discovery task the returned value from discovery tasks are aggregated in inventory task
func (s *InventoryTaskBuilder[T]) DiscoveryTask(taskID taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T] {
s.mu.Lock()
defer s.mu.Unlock()
inventoryTaskID := s.inventoryTaskID.Ref()
labelOpts = append(labelOpts, coretask.NewSubsequentTaskRefsTaskLabel(inventoryTaskID))

found := false
for _, ref := range s.discoveryTaskRefs {
if ref.ReferenceIDString() == taskID.ReferenceIDString() {
found = true
break
}
}
if !found {
s.discoveryTaskRefs = append(s.discoveryTaskRefs, taskID.Ref())
}

return NewProgressReportableInspectionTask(taskID, dependencies, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (T, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
return *new(T), nil
}
return taskFunc(ctx, taskMode, progress)
}, labelOpts...)
}

// InventoryMergerStrategy defines the strategy how the generated InventoryTask merges results received from multiple discovery tasks.
type InventoryMergerStrategy[T any] interface {

// Merge defines the logic to combine multiple results from various InventoryDiscoveryTasks
// into a single, consolidated result.
Merge(results []T) (T, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,11 @@ import (
"github.com/google/go-cmp/cmp"
)

type testSimpleStringRelationshipMergerTaskSetting struct {
idSource *RelationshipTaskIDSource[map[string]struct{}]
type testSimpleStringMergerStrategy struct {
}

// IDSource implements RelationshipMergerTaskSetting.
func (t *testSimpleStringRelationshipMergerTaskSetting) IDSource() *RelationshipTaskIDSource[map[string]struct{}] {
return t.idSource
}

// Merge implements RelationshipMergerTaskSetting.
func (t *testSimpleStringRelationshipMergerTaskSetting) Merge(results []map[string]struct{}) (map[string]struct{}, error) {
// Merge implements InventoryMergerTaskSetting.
func (t *testSimpleStringMergerStrategy) Merge(results []map[string]struct{}) (map[string]struct{}, error) {
result := make(map[string]struct{})
for _, r := range results {
for k := range r {
Expand All @@ -46,52 +40,31 @@ func (t *testSimpleStringRelationshipMergerTaskSetting) Merge(results []map[stri
return result, nil
}

var _ RelationshipMergerTaskSetting[map[string]struct{}] = (*testSimpleStringRelationshipMergerTaskSetting)(nil)

// TestRelationshipTaskIDSource tests the basic functionality of the RelationshipTaskIDSource.
// It verifies that:
// - GenerateDefaultRelationshipDiscoveryTaskID correctly creates a task ID.
// - Attempting to generate a task ID with the same reference string multiple times does not result in duplicate entries.
func TestRelationshipTaskIDSource(t *testing.T) {
mergerTaskID := taskid.NewDefaultImplementationID[struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)

wantID := taskid.NewDefaultImplementationID[struct{}]("discovery")
gotID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery")

if diff := cmp.Diff(wantID.String(), gotID.String()); diff != "" {
t.Errorf("GenerateDefaultRelationshipDiscoveryTaskID() mismatch (-want +got):\n%s", diff)
}

ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery")
if len(ids.discoveryTaskRefs) != 1 {
t.Errorf("Expected discoveryTaskRefs to contain 1 element after generating the same ID twice, got %d", len(ids.discoveryTaskRefs))
}
}
var _ InventoryMergerStrategy[map[string]struct{}] = (*testSimpleStringMergerStrategy)(nil)

// TestRelationshipTask_ProvidedFromSingleDiscoveryTask tests a scenario where the merger task
// TestInventoryTask_ProvidedFromSingleDiscoveryTask tests a scenario where the merger task
// receives data from only one of two available discovery tasks.
// This is because the main user task only depends on the parent of the first discovery task.
// The test verifies that only the result from the first discovery task ("foo") is present in the final merged map and the task dependency topology doesn't add the discovery-2 task not intentionally.
func TestRelationshipTask_ProvidedFromSingleDiscoveryTask(t *testing.T) {
func TestInventoryTask_ProvidedFromSingleDiscoveryTask(t *testing.T) {
nop := func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) {
return struct{}{}, nil
}
mergerTaskID := taskid.NewDefaultImplementationID[map[string]struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)
mergerTask := NewRelationshipMergerTask(&testSimpleStringRelationshipMergerTaskSetting{ids})
discovery1ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-1")
discovery2ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-2")
builder := NewInventoryTaskBuilder(mergerTaskID)
mergerTask := builder.InventoryTask(&testSimpleStringMergerStrategy{})
discovery1ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery1")
discovery2ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery2")
discovery1ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-1-parent")
discovery1ParentTask := NewInspectionTask(discovery1ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery1ID.Ref()))
discovery1 := NewRelationshipDiscoveryTask(discovery1ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery1 := builder.DiscoveryTask(discovery1ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"foo": {},
}, nil
})
discovery2ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-2-parent")
discovery2ParentTask := NewInspectionTask(discovery2ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery2ID.Ref()))
discovery2 := NewRelationshipDiscoveryTask(discovery2ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery2 := builder.DiscoveryTask(discovery2ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"bar": {},
}, nil
Expand All @@ -115,29 +88,29 @@ func TestRelationshipTask_ProvidedFromSingleDiscoveryTask(t *testing.T) {
}
}

// TestRelationshipTask_ProvidedFromMultipleDiscoveryTask tests a scenario where the merger task
// TestInventoryTask_ProvidedFromMultipleDiscoveryTask tests a scenario where the merger task
// receives and merges data from multiple discovery tasks.
// This is because the main user task depends on the parents of both discovery tasks.
// The test verifies that the results from both discovery tasks ("foo" and "bar") are present in the final merged map.
func TestRelationshipTask_ProvidedFromMultipleDiscoveryTask(t *testing.T) {
func TestInventoryTask_ProvidedFromMultipleDiscoveryTask(t *testing.T) {
nop := func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) {
return struct{}{}, nil
}
mergerTaskID := taskid.NewDefaultImplementationID[map[string]struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)
mergerTask := NewRelationshipMergerTask(&testSimpleStringRelationshipMergerTaskSetting{ids})
discovery1ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-1")
discovery2ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-2")
builder := NewInventoryTaskBuilder(mergerTaskID)
mergerTask := builder.InventoryTask(&testSimpleStringMergerStrategy{})
discovery1ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery1")
discovery2ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery2")
discovery1ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-1-parent")
discovery1ParentTask := NewInspectionTask(discovery1ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery1ID.Ref()))
discovery1 := NewRelationshipDiscoveryTask(discovery1ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery1 := builder.DiscoveryTask(discovery1ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"foo": {},
}, nil
})
discovery2ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-2-parent")
discovery2ParentTask := NewInspectionTask(discovery2ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery2ID.Ref()))
discovery2 := NewRelationshipDiscoveryTask(discovery2ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery2 := builder.DiscoveryTask(discovery2ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"bar": {},
}, nil
Expand All @@ -162,28 +135,28 @@ func TestRelationshipTask_ProvidedFromMultipleDiscoveryTask(t *testing.T) {
}
}

// TestRelationshipTask_ProvidedFromNoDiscoveryTask tests a scenario where the merger task receives no data.
// TestInventoryTask_ProvidedFromNoDiscoveryTask tests a scenario where the merger task receives no data.
// This is because the main user task does not depend on any of the discovery tasks' parents.
// The test verifies that the final merged map is empty.
func TestRelationshipTask_ProvidedFromNoDiscoveryTask(t *testing.T) {
func TestInventoryTask_ProvidedFromNoDiscoveryTask(t *testing.T) {
nop := func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) {
return struct{}{}, nil
}
mergerTaskID := taskid.NewDefaultImplementationID[map[string]struct{}]("test")
ids := NewRelationshipTaskIDSource(mergerTaskID)
mergerTask := NewRelationshipMergerTask(&testSimpleStringRelationshipMergerTaskSetting{ids})
discovery1ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-1")
discovery2ID := ids.GenerateDefaultRelationshipDiscoveryTaskID("discovery-2")
builder := NewInventoryTaskBuilder(mergerTaskID)
mergerTask := builder.InventoryTask(&testSimpleStringMergerStrategy{})
discovery1ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery1")
discovery2ID := taskid.NewDefaultImplementationID[map[string]struct{}]("discovery2")
discovery1ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-1-parent")
discovery1ParentTask := NewInspectionTask(discovery1ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery1ID.Ref()))
discovery1 := NewRelationshipDiscoveryTask(discovery1ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery1 := builder.DiscoveryTask(discovery1ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"foo": {},
}, nil
})
discovery2ParentTaskID := taskid.NewDefaultImplementationID[struct{}]("discovery-2-parent")
discovery2ParentTask := NewInspectionTask(discovery2ParentTaskID, []taskid.UntypedTaskReference{}, nop, coretask.NewSubsequentTaskRefsTaskLabel(discovery2ID.Ref()))
discovery2 := NewRelationshipDiscoveryTask(discovery2ID, ids, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
discovery2 := builder.DiscoveryTask(discovery2ID, []taskid.UntypedTaskReference{}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (map[string]struct{}, error) {
return map[string]struct{}{
"bar": {},
}, nil
Expand Down
Loading