Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/core/inspection/taskbase/loggroup_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ type LogGroup struct {
// LogGroupMap is a map of log groups, where the key is the group identifier.
type LogGroupMap = map[string]*LogGroup

// LogGrouper defines a function that returns a group key for a given log.
type LogGrouper = func(ctx context.Context, log *log.Log) string
// LogGrouperFunc defines a function that returns a group key for a given log.
type LogGrouperFunc = func(ctx context.Context, log *log.Log) string

// NewLogGrouperTask creates a task that groups logs based on a grouper function.
// It processes a list of logs and organizes them into a map of LogGroup,
// where each group contains logs with the same key.
func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], logTask taskid.TaskReference[[]*log.Log], grouper LogGrouper) coretask.Task[LogGroupMap] {
func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], logTask taskid.TaskReference[[]*log.Log], grouper LogGrouperFunc) coretask.Task[LogGroupMap] {
return NewProgressReportableInspectionTask(taskId, []taskid.UntypedTaskReference{
logTask,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/inspection/taskbase/loggroup_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestNwewLogGrouperTask(t *testing.T) {
name string
taskMode inspectioncore_contract.InspectionTaskModeType
logYamls []string
logGrouper LogGrouper
logGrouper LogGrouperFunc
resultLogIDs map[string][]string
}{
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"go.opentelemetry.io/otel/trace"
)

// NewLogSerializerTask store its given logs to history to prepare the history type to have ChangeSet associated with the log.
// This must be called before HistoryModifier and Logs must be discarded before this task if it shouldn't be included in the result.
func NewLogSerializerTask(taskID taskid.TaskImplementationID[[]*log.Log], input taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log] {
// NewLogIngester store its given logs to history to prepare the history type to have ChangeSet associated with the log.
// This must be called before LogToTimelineMapperTask and Logs must be discarded before this task if it shouldn't be included in the result.
func NewLogIngester(taskID taskid.TaskImplementationID[[]*log.Log], input taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log] {
Comment thread
kyasbal marked this conversation as resolved.
Outdated
return NewProgressReportableInspectionTask(taskID, []taskid.UntypedTaskReference{input}, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) ([]*log.Log, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
return []*log.Log{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"github.com/GoogleCloudPlatform/khi/pkg/testutil/testlog"
)

func TestLogSerializerTask_DryRunMode(t *testing.T) {
func TestLogIngesterTask_DryRunMode(t *testing.T) {
l := testlog.MustLogFromYAML("insertId: foo", &mockCommonLogFieldSetReader{})
ctx := inspectiontest.WithDefaultTestInspectionTaskContext(t.Context())
inputTaskID := taskid.NewDefaultImplementationID[[]*log.Log]("input")
taskID := taskid.NewDefaultImplementationID[[]*log.Log]("test")
task := NewLogSerializerTask(taskID, inputTaskID.Ref())
task := NewLogIngester(taskID, inputTaskID.Ref())

result, _, err := inspectiontest.RunInspectionTask(ctx, task, inspectioncore_contract.TaskModeDryRun, map[string]any{},
tasktest.NewTaskDependencyValuePair(inputTaskID.Ref(), []*log.Log{l}))
Expand All @@ -40,22 +40,22 @@ func TestLogSerializerTask_DryRunMode(t *testing.T) {
}

if len(result) != 0 {
t.Errorf("LogSerializerTask returned a log result for dryrun mode")
t.Errorf("LogIngesterTask returned a log result for dryrun mode")
}

builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentHistoryBuilder)
_, err = builder.GetLog(l.ID)
if err == nil {
t.Errorf("LogSerializerTask must not write serialzied log to the builder when it run for dryrun, but it generated a serialized log.")
t.Errorf("LogIngesterTask must not write log to the builder when it run for dryrun, but it wrote a log.")
}
}

func TestLogSerializerTask_RunMode(t *testing.T) {
func TestLogIngesterTask_RunMode(t *testing.T) {
l := testlog.MustLogFromYAML("insertId: foo", &mockCommonLogFieldSetReader{})
ctx := inspectiontest.WithDefaultTestInspectionTaskContext(t.Context())
inputTaskID := taskid.NewDefaultImplementationID[[]*log.Log]("input")
taskID := taskid.NewDefaultImplementationID[[]*log.Log]("test")
task := NewLogSerializerTask(taskID, inputTaskID.Ref())
task := NewLogIngester(taskID, inputTaskID.Ref())

result, _, err := inspectiontest.RunInspectionTask(ctx, task, inspectioncore_contract.TaskModeRun, map[string]any{},
tasktest.NewTaskDependencyValuePair(inputTaskID.Ref(), []*log.Log{l}))
Expand All @@ -64,12 +64,12 @@ func TestLogSerializerTask_RunMode(t *testing.T) {
}

if len(result) != 1 {
t.Errorf("LogSerializerTask didn't return a log result for run mode")
t.Errorf("LogIngesterTask didn't return a log result for run mode")
}

builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentHistoryBuilder)
_, err = builder.GetLog(l.ID)
if err != nil {
t.Errorf("LogSerializerTask must write serialzied log to the builder when it run. err=%v", err)
t.Errorf("LogIngesterTask must write log to the builder when it run. err=%v", err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,28 @@ import (
"go.opentelemetry.io/otel/trace"
)

// HistoryModifer defines the interface for modifying the History with change sets based on log entries.
// Implementations of this interface can be used to customize how log data is transformed into
// structured history.
// To process data generated from processing the last log in the same group, the method ModifyChangeSetFromLog receive and return a variable typed T.
type HistoryModifer[T any] interface {
// LogSerializerTask is one of prerequiste task of HistoryModifier serializes its logs to history data before processing with this modifier.
LogSerializerTask() taskid.TaskReference[[]*log.Log]
// Dependencies are the additional references used in history modifier.
// LogToTimelineMapper defines the interface for mapping logs to timeline elements(events or revisions).
Comment thread
kyasbal marked this conversation as resolved.
Outdated
// Implementations of this interface can be used to customize how log data is transformed into timeline elements.
// To process data generated from processing the last log in the same group, the method ProcessLogByGroup receive and return a variable typed T.
Comment thread
kyasbal marked this conversation as resolved.
Outdated
type LogToTimelineMapper[T any] interface {
// LogIngesterTask is one of prerequiste task of LogToTimelineMapper ingesting logs to history data before processing with this mapper.
Comment thread
kyasbal marked this conversation as resolved.
Outdated
LogIngesterTask() taskid.TaskReference[[]*log.Log]
// Dependencies are the additional references used in timeline mapper.
Dependencies() []taskid.UntypedTaskReference
// GroupedLogTask returns a reference to the task that provides the grouped logs.
GroupedLogTask() taskid.TaskReference[LogGroupMap]
// ModifyChangeSetFromLog is called for each log entry to modify the corresponding ChangeSet.
// ProcessLogByGroup is called for each log entry to modify the corresponding ChangeSet.
// This method allows for custom logic to be applied during the history building process.
// The prevGroupData is the returned value from the last procesed log in the same group.
ModifyChangeSetFromLog(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
ProcessLogByGroup(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
}

// NewHistoryModifierTask creates a task that modifies the history builder based on grouped logs.
// It processes logs in parallel and applies the logic from the provided HistoryModifer
// NewLogToTimelineMapperTask creates a task that modifies the history builder based on grouped logs.
// It processes logs in parallel and applies the logic from the provided LogToTimelineMapper
// to build a comprehensive history of events.
func NewHistoryModifierTask[T any](tid taskid.TaskImplementationID[struct{}], historyModifier HistoryModifer[T], labels ...coretask.LabelOpt) coretask.Task[struct{}] {
groupedLogTaskID := historyModifier.GroupedLogTask()
dependencies := append([]taskid.UntypedTaskReference{historyModifier.LogSerializerTask(), historyModifier.GroupedLogTask()}, historyModifier.Dependencies()...)
func NewLogToTimelineMapperTask[T any](tid taskid.TaskImplementationID[struct{}], mapper LogToTimelineMapper[T], labels ...coretask.LabelOpt) coretask.Task[struct{}] {
groupedLogTaskID := mapper.GroupedLogTask()
dependencies := append([]taskid.UntypedTaskReference{mapper.LogIngesterTask(), mapper.GroupedLogTask()}, mapper.Dependencies()...)
return NewProgressReportableInspectionTask(tid, dependencies, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, tp *inspectionmetadata.TaskProgressMetadata) (struct{}, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
slog.DebugContext(ctx, "Skipping task because this is dry run mode")
Expand Down Expand Up @@ -91,7 +90,7 @@ func NewHistoryModifierTask[T any](tid taskid.TaskImplementationID[struct{}], hi
for _, l := range group.Logs {
cs := history.NewChangeSet(l)
var err error
groupData, err = historyModifier.ModifyChangeSetFromLog(ctx, l, cs, builder, groupData)
groupData, err = mapper.ProcessLogByGroup(ctx, l, cs, builder, groupData)
if err != nil {
var yaml string
yamlBytes, err2 := l.Serialize("", &structured.YAMLNodeSerializer{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,32 @@ import (
inspectioncore_contract "github.com/GoogleCloudPlatform/khi/pkg/task/inspection/inspectioncore/contract"
)

var mockHistoryModifierPrevTaskID = taskid.NewDefaultImplementationID[LogGroupMap]("mock-history-modifier-prev")
var mockLogToTimelineMapperPrevTaskID = taskid.NewDefaultImplementationID[LogGroupMap]("mock-timeline-mapper-prev")

var mockLogSerializerPrevTaskID = taskid.NewDefaultImplementationID[[]*log.Log]("mock-history-modifier-prev-log-serializer")
var mockLogSerializerPrevTaskID = taskid.NewDefaultImplementationID[[]*log.Log]("mock-timeline-mapper-prev-log-serializer")

type mockHistoryModifierGroupData struct {
type mockLogToTimelineMapperGroupData struct {
CurrentGroupLogCount int
}

type mockHistoryModifier struct {
type mockLogToTimelineMapper struct {
}

// GroupedLogTask implements HistoryModifer.
func (m *mockHistoryModifier) GroupedLogTask() taskid.TaskReference[LogGroupMap] {
return mockHistoryModifierPrevTaskID.Ref()
func (m *mockLogToTimelineMapper) GroupedLogTask() taskid.TaskReference[LogGroupMap] {
return mockLogToTimelineMapperPrevTaskID.Ref()
}

func (m *mockHistoryModifier) LogSerializerTask() taskid.TaskReference[[]*log.Log] {
func (m *mockLogToTimelineMapper) LogIngesterTask() taskid.TaskReference[[]*log.Log] {
return mockLogSerializerPrevTaskID.Ref()
}

func (m *mockHistoryModifier) Dependencies() []taskid.UntypedTaskReference {
func (m *mockLogToTimelineMapper) Dependencies() []taskid.UntypedTaskReference {
return []taskid.UntypedTaskReference{}
}

// ModifyChangeSetFromLog implements HistoryModifer.
func (m *mockHistoryModifier) ModifyChangeSetFromLog(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevData mockHistoryModifierGroupData) (mockHistoryModifierGroupData, error) {
// ProcessLogByGroup implements HistoryModifer.
Comment thread
kyasbal marked this conversation as resolved.
Outdated
func (m *mockLogToTimelineMapper) ProcessLogByGroup(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevData mockLogToTimelineMapperGroupData) (mockLogToTimelineMapperGroupData, error) {
// encode current group count to severity to use them assert in tasecases to verify the prevData is correctly handled.
switch prevData.CurrentGroupLogCount {
case 0:
Expand All @@ -70,7 +70,7 @@ func (m *mockHistoryModifier) ModifyChangeSetFromLog(ctx context.Context, l *log
}
shouldErr := l.ReadBoolOrDefault("error", false)
if shouldErr {
return mockHistoryModifierGroupData{
return mockLogToTimelineMapperGroupData{
CurrentGroupLogCount: prevData.CurrentGroupLogCount + 1,
}, fmt.Errorf("test error")
}
Expand All @@ -80,12 +80,12 @@ func (m *mockHistoryModifier) ModifyChangeSetFromLog(ctx context.Context, l *log
l.ReadStringOrDefault("namespace", "unknown"),
l.ReadStringOrDefault("name", "unknown"),
))
return mockHistoryModifierGroupData{
return mockLogToTimelineMapperGroupData{
CurrentGroupLogCount: prevData.CurrentGroupLogCount + 1,
}, nil
}

var _ HistoryModifer[mockHistoryModifierGroupData] = (*mockHistoryModifier)(nil)
var _ LogToTimelineMapper[mockLogToTimelineMapperGroupData] = (*mockLogToTimelineMapper)(nil)

type mockCommonLogFieldSetReader struct {
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func mustNewLogFromYAML(t *testing.T, yaml string) *log.Log {
return l
}

func TestHistoryModifierTask(t *testing.T) {
func TestLogToTimelineMapperTask(t *testing.T) {
testCases := []struct {
desc string
taskMode inspectioncore_contract.InspectionTaskModeType
Expand Down Expand Up @@ -268,11 +268,11 @@ func TestHistoryModifierTask(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {
tid := taskid.NewDefaultImplementationID[struct{}]("mock-history-modifier")
tid := taskid.NewDefaultImplementationID[struct{}]("mock-timeline-mapper")

ctx := context.Background()
ctx = inspectiontest.WithDefaultTestInspectionTaskContext(ctx)
task := NewHistoryModifierTask(tid, &mockHistoryModifier{})
task := NewLogToTimelineMapperTask(tid, &mockLogToTimelineMapper{})
builder := khictx.MustGetValue(ctx, inspectioncore_contract.CurrentHistoryBuilder)

for _, group := range testCase.prevLogGroupMap {
Expand All @@ -282,7 +282,7 @@ func TestHistoryModifierTask(t *testing.T) {
}
}

_, _, err := inspectiontest.RunInspectionTask(ctx, task, testCase.taskMode, map[string]any{}, tasktest.NewTaskDependencyValuePair(mockHistoryModifierPrevTaskID.Ref(), testCase.prevLogGroupMap))
_, _, err := inspectiontest.RunInspectionTask(ctx, task, testCase.taskMode, map[string]any{}, tasktest.NewTaskDependencyValuePair(mockLogToTimelineMapperPrevTaskID.Ref(), testCase.prevLogGroupMap))
if (err != nil) != testCase.wantError {
t.Fatalf("RunInspectionTask() error = %v, wantError %v", err, testCase.wantError)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ type ResourceChangeEvent struct {
EventTargetBodyReader *structured.NodeReader
}

// ManifestHistoryModifierTaskSetting is the setting for the manifest history modifier task.
type ManifestHistoryModifierTaskSetting[T any] interface {
// ManifestLogToTimelineMapperTaskSetting is the setting for the manifest timeline mapper task.
type ManifestLogToTimelineMapperTaskSetting[T any] interface {
// TaskID returns the task ID.
TaskID() taskid.TaskImplementationID[struct{}]
// LogSerializerTask returns the task reference for the log serializer task.
LogSerializerTask() taskid.TaskReference[[]*log.Log]
// LogIngesterTask returns the task reference for the log serializer task.
LogIngesterTask() taskid.TaskReference[[]*log.Log]
// GroupedLogTask returns the task reference for the grouped log task.
GroupedLogTask() taskid.TaskReference[ResourceManifestLogGroupMap]
// Dependencies returns the dependencies of the task.
Expand All @@ -104,8 +104,8 @@ type ManifestHistoryModifierTaskSetting[T any] interface {
Process(ctx context.Context, passIndex int, event ResourceChangeEvent, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
}

// NewManifestHistoryModifier creates a new manifest history modifier task.
// ManifestHistoryModifier is a task that generates a timeline of resource changes based on the processed manifests.
// NewManifestLogToTimelineMapper creates a new timeline mapper task but from resource logs.
// ManifestLogToTimelineMapper is a task that generates a timeline of resource changes based on the processed manifests.
// It is designed to handle the relationship between two resources (Source and Target) and generate revisions for the Target resource based on the changes in the Source resource.
// For example, it can be used to generate a timeline of Pod status changes based on the Pod resource itself (Source=None, Target=Pod), or to generate a timeline of binding subresource but deleted when its parent Pod is deleted (Source=Pod, Target=Source pod's binding).
// The setting has ResourcePairs method that returns the resource pairs to know these pairs of target and source.
Expand All @@ -116,8 +116,8 @@ type ManifestHistoryModifierTaskSetting[T any] interface {
// Type Parameter T:
// The type parameter T represents the state that is passed between Process calls for the same resource pair.
// This allows the implementation to track the history of the resource and detect changes.
func NewManifestHistoryModifier[T any](setting ManifestHistoryModifierTaskSetting[T]) coretask.Task[struct{}] {
dependencies := append([]taskid.UntypedTaskReference{setting.LogSerializerTask(), setting.GroupedLogTask()}, setting.Dependencies()...)
func NewManifestLogToTimelineMapper[T any](setting ManifestLogToTimelineMapperTaskSetting[T]) coretask.Task[struct{}] {
dependencies := append([]taskid.UntypedTaskReference{setting.LogIngesterTask(), setting.GroupedLogTask()}, setting.Dependencies()...)
return inspectiontaskbase.NewProgressReportableInspectionTask(setting.TaskID(), dependencies, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, tp *inspectionmetadata.TaskProgressMetadata) (struct{}, error) {
if taskMode == inspectioncore_contract.TaskModeDryRun {
slog.DebugContext(ctx, "Skipping task because this is dry run mode")
Expand Down
Loading