Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions pkg/core/inspection/taskbase/logsorter_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package inspectiontaskbase

import (
"context"
"slices"

inspectionmetadata "github.com/GoogleCloudPlatform/khi/pkg/core/inspection/metadata"
coretask "github.com/GoogleCloudPlatform/khi/pkg/core/task"
"github.com/GoogleCloudPlatform/khi/pkg/core/task/taskid"
"github.com/GoogleCloudPlatform/khi/pkg/model/log"
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
)

func NewLogSorterByTimeTask(taskID taskid.TaskImplementationID[[]*log.Log], logSource taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log] {
return NewProgressReportableInspectionTask(taskID, []taskid.UntypedTaskReference{logSource}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) ([]*log.Log, error) {
if taskMode != inspectioncore_contract.TaskModeRun {
return []*log.Log{}, nil
}
progress.MarkIndeterminate()
logs := coretask.GetTaskResult(ctx, logSource)
logs = slices.Clone(logs)
slices.SortFunc(logs, func(a, b *log.Log) int {
aFieldSet := log.MustGetFieldSet(a, &log.CommonFieldSet{})
bFieldSet := log.MustGetFieldSet(b, &log.CommonFieldSet{})
return aFieldSet.Timestamp.Compare(bFieldSet.Timestamp)
})
return logs, nil
Comment thread
kyasbal marked this conversation as resolved.
})
}
125 changes: 125 additions & 0 deletions pkg/core/inspection/taskbase/logsorter_task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package inspectiontaskbase

import (
"context"
"testing"
"time"

inspectiontest "github.com/GoogleCloudPlatform/khi/pkg/core/inspection/test"
"github.com/GoogleCloudPlatform/khi/pkg/core/task/taskid"
tasktest "github.com/GoogleCloudPlatform/khi/pkg/core/task/test"
"github.com/GoogleCloudPlatform/khi/pkg/model/log"
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
"github.com/google/go-cmp/cmp"
)

func TestLogSorterByTimeTask(t *testing.T) {
testCases := []struct {
name string
mode inspectioncore_contract.InspectionTaskModeType
logs []*log.CommonFieldSet
want []*log.CommonFieldSet
}{
{
name: "should return an empty list for empty log input on task run mode",
mode: inspectioncore_contract.TaskModeRun,
logs: []*log.CommonFieldSet{},
want: make([]*log.CommonFieldSet, 0),
},
{
name: "should return an empty list for empty log input on task dry run mode",
mode: inspectioncore_contract.TaskModeDryRun,
logs: []*log.CommonFieldSet{
{
DisplayID: "foo",
Timestamp: time.Date(2025, 11, 21, 13, 16, 34, 0, time.UTC),
},
{
DisplayID: "bar",
Timestamp: time.Date(2025, 11, 21, 13, 16, 33, 0, time.UTC),
},
{
DisplayID: "qux",
Timestamp: time.Date(2025, 11, 21, 13, 16, 32, 0, time.UTC),
},
},
want: make([]*log.CommonFieldSet, 0),
},
{
name: "should return sorted logs on task run mode",
mode: inspectioncore_contract.TaskModeRun,
logs: []*log.CommonFieldSet{
{
DisplayID: "foo",
Timestamp: time.Date(2025, 11, 21, 13, 16, 34, 0, time.UTC),
},
{
DisplayID: "bar",
Timestamp: time.Date(2025, 11, 21, 13, 16, 33, 0, time.UTC),
},
{
DisplayID: "qux",
Timestamp: time.Date(2025, 11, 21, 13, 16, 32, 0, time.UTC),
},
},
want: []*log.CommonFieldSet{
{
DisplayID: "qux",
Timestamp: time.Date(2025, 11, 21, 13, 16, 32, 0, time.UTC),
},
{
DisplayID: "bar",
Timestamp: time.Date(2025, 11, 21, 13, 16, 33, 0, time.UTC),
},
{
DisplayID: "foo",
Timestamp: time.Date(2025, 11, 21, 13, 16, 34, 0, time.UTC),
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logs := []*log.Log{}
for _, commonFieldSet := range tc.logs {
l := log.NewLogWithFieldSetsForTest(commonFieldSet)
logs = append(logs, l)
}

testSourceTaskID := taskid.NewDefaultImplementationID[[]*log.Log]("source")
testTaskID := taskid.NewDefaultImplementationID[[]*log.Log]("dest")
task := NewLogSorterByTimeTask(testTaskID, testSourceTaskID.Ref())

ctx := inspectiontest.WithDefaultTestInspectionTaskContext(context.Background())
sortedLogs, _, err := inspectiontest.RunInspectionTask(ctx, task, tc.mode, map[string]any{}, tasktest.NewTaskDependencyValuePair(testSourceTaskID.Ref(), logs))
if err != nil {
t.Fatalf("RunInspectionTask returned an unexpected error: %v", err)
}

got := []*log.CommonFieldSet{}
for _, l := range sortedLogs {
fieldSet := log.MustGetFieldSet(l, &log.CommonFieldSet{})
got = append(got, fieldSet)
}

if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("log sort mismatch (-want +got):\n%s", diff)
}

})
}
}
60 changes: 60 additions & 0 deletions pkg/core/inspection/taskbasetest/logfiltertestutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package inspectiontaskbasetest

import (
"testing"

inspectiontest "github.com/GoogleCloudPlatform/khi/pkg/core/inspection/test"
coretask "github.com/GoogleCloudPlatform/khi/pkg/core/task"
"github.com/GoogleCloudPlatform/khi/pkg/core/task/taskid"
tasktest "github.com/GoogleCloudPlatform/khi/pkg/core/task/test"
"github.com/GoogleCloudPlatform/khi/pkg/model/log"
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
)

// FilterTaskTestCase is a test case for testing a filter task.
// It contains the log fields to be filtered and the expected result.
type FilterTaskTestCase struct {
Description string
LogFields []log.FieldSet
WantIncluded bool
}

// AssertFilterTask asserts that the given filter task behaves as expected for the given test cases.
// It runs the task with the given log fields and checks if the log is included or excluded from the result.
func AssertFilterTask(t *testing.T, task coretask.Task[[]*log.Log], sourceRef taskid.TaskReference[[]*log.Log], testCases []FilterTaskTestCase) {
t.Helper()
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
l := log.NewLogWithFieldSetsForTest(tc.LogFields...)
ctx := inspectiontest.WithDefaultTestInspectionTaskContext(t.Context())

result, _, err := inspectiontest.RunInspectionTask(ctx, task, inspectioncore_contract.TaskModeRun, map[string]any{}, tasktest.NewTaskDependencyValuePair(sourceRef, []*log.Log{l}))
if err != nil {
t.Fatalf("RunInspectionTask failed: %v", err)
}
if tc.WantIncluded {
if len(result) == 0 {
t.Errorf("given log was unexpectedly filtered out. want=1, got=0")
}
} else {
if len(result) != 0 {
t.Errorf("given log was unexpectedly included. want=0, got=1")
}
}
})
}
}
58 changes: 58 additions & 0 deletions pkg/core/inspection/taskbasetest/loggroupertestutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package inspectiontaskbasetest

import (
"testing"

inspectiontaskbase "github.com/GoogleCloudPlatform/khi/pkg/core/inspection/taskbase"
inspectiontest "github.com/GoogleCloudPlatform/khi/pkg/core/inspection/test"
coretask "github.com/GoogleCloudPlatform/khi/pkg/core/task"
"github.com/GoogleCloudPlatform/khi/pkg/core/task/taskid"
tasktest "github.com/GoogleCloudPlatform/khi/pkg/core/task/test"
"github.com/GoogleCloudPlatform/khi/pkg/model/log"
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
)

// GrouperTaskTestCase is a test case for log grouper task.
type GrouperTaskTestCase struct {
Description string
LogFields []log.FieldSet
WantGroup string
}

// AssertGrouperTask asserts that the given grouper task behaves as expected for the given test cases.
func AssertGrouperTask(t *testing.T, task coretask.Task[inspectiontaskbase.LogGroupMap], sourceRef taskid.TaskReference[[]*log.Log], testCases []GrouperTaskTestCase) {
t.Helper()
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
l := log.NewLogWithFieldSetsForTest(tc.LogFields...)
ctx := inspectiontest.WithDefaultTestInspectionTaskContext(t.Context())

result, _, err := inspectiontest.RunInspectionTask(ctx, task, inspectioncore_contract.TaskModeRun, map[string]any{}, tasktest.NewTaskDependencyValuePair(sourceRef, []*log.Log{l}))
if err != nil {
t.Fatalf("RunInspectionTask failed: %v", err)
}
if len(result) != 1 {
t.Fatalf("unexpected element count found in result: want=1, got=%d", len(result))
}
for _, group := range result {
if group.Group != tc.WantGroup {
t.Fatalf("unexpected group found in result: want=%s, got=%s", tc.WantGroup, group.Group)
}
}
})
}
}
23 changes: 23 additions & 0 deletions pkg/model/history/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/GoogleCloudPlatform/khi/pkg/model/binarychunk"
"github.com/GoogleCloudPlatform/khi/pkg/model/enum"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/resourceinfo"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/resourcepath"
"github.com/GoogleCloudPlatform/khi/pkg/model/log"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -360,6 +361,8 @@ func (builder *Builder) sortData() error {
// Finalize flushes the binary chunk data and serialized metadata to the given io.Writer. Returns the written data size in bytes and error.
func (builder *Builder) Finalize(ctx context.Context, serializedMetadata map[string]interface{}, writer io.Writer, progress *inspectionmetadata.TaskProgressMetadata) (int, error) {
fileSize := 0
progress.Update(0, "Verifying orphan logs")
builder.verifyingOrphanLogs()
progress.Update(0, "Sorting log entries")
progress.MarkIndeterminate()
builder.history.Metadata = serializedMetadata
Expand Down Expand Up @@ -401,6 +404,26 @@ func (builder *Builder) Finalize(ctx context.Context, serializedMetadata map[str
return fileSize, nil
}

func (b *Builder) verifyingOrphanLogs() {
logIDs := map[string]struct{}{}
for _, timeline := range b.history.Timelines {
for _, event := range timeline.Events {
logIDs[event.Log] = struct{}{}
}
for _, revision := range timeline.Revisions {
logIDs[revision.Log] = struct{}{}
}
}
for _, log := range b.history.Logs {
if _, found := logIDs[log.ID]; !found {
tb := b.GetTimelineBuilder(resourcepath.NameLayerGeneralItem("@KHI", "error", "orphan-logs", enum.LogTypes[log.Type].EnumKeyName).Path)
tb.AddEvent(&ResourceEvent{
Log: log.ID,
})
}
}
}

// DangerouslyGetRawHistory returns the raw history value written by this builder. This method is only used for testing purpose.
func (b *Builder) DangerouslyGetRawHistory() *History {
return b.history
Expand Down
21 changes: 15 additions & 6 deletions pkg/model/k8s_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,21 @@ func (o *KubernetesObjectOperation) CovertToResourcePath() string {
o.SubResourceName,
}, "#"))
} else {
return strings.ToLower(strings.Join([]string{
o.APIVersion,
o.GetSingularKindName(),
o.Namespace,
o.Name,
}, "#"))
if o.Name == "" {
return strings.ToLower(strings.Join([]string{
o.APIVersion,
o.GetSingularKindName(),
o.Namespace,
"@namespace",
}, "#"))
} else {
return strings.ToLower(strings.Join([]string{
o.APIVersion,
o.GetSingularKindName(),
o.Namespace,
o.Name,
}, "#"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestDefaultGroupDecider(t *testing.T) {
Name: "",
Verb: enum.RevisionVerbDeleteCollection,
},
wantGroup: "core/v1#pod#default#", // TODO: This is OK for now. This will be fixed with #267 'Changes made by delete collection operation may generate wrong resource timeline'
wantGroup: "core/v1#pod#default#@namespace",
},
}
for _, tc := range testCases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package googlecloudcommon_impl
import (
"context"
"fmt"
"slices"
"strings"

"github.com/GoogleCloudPlatform/khi/pkg/api/googlecloud"
Expand Down Expand Up @@ -110,13 +111,18 @@ var InputLoggingFilterResourceNameTask = inspectiontaskbase.NewInspectionTask(go
// getCurrentActiveQueryIDsForResourceName returns the query IDs that are currently active with retrieving them from the current task graph.
func getCurrentActiveQueryIDsForResourceName(runner coretask.TaskRunner) []string {
tasks := runner.Tasks()
result := []string{}
resultMap := map[string]struct{}{}
for _, t := range tasks {
requestInput, found := typedmap.Get(t.Labels(), googlecloudcommon_contract.RequestOptionalInputResourceNameTaskLabel)
if !found {
continue
}
result = append(result, requestInput)
resultMap[requestInput] = struct{}{}
}
result := []string{}
for k := range resultMap {
result = append(result, k)
}
slices.Sort(result)
return result
}
Loading