Skip to content

Commit 8fde121

Browse files
feat: add replay runner and scheduler with CLI integration
Execution engine for the timeseries replay system: - Runner: worker pool with time-bucketed scheduling, per-worker metrics, pool-first WATCH connection assignment with overflow support - Scheduler: orchestrator for both local multi-runner and distributed single-runner modes with configuration validation and warnings - CLI: 'kperf replay run' command for local replay execution and 'kperf runner replay' subcommand for distributed runner pods Signed-off-by: JasonXuDeveloper - 傑 <jason@xgamedev.net>
1 parent ff34842 commit 8fde121

File tree

11 files changed

+1663
-3
lines changed

11 files changed

+1663
-3
lines changed

api/types/replay.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ func (r ReplayRequest) Validate() error {
4646
return fmt.Errorf("apiPath is required")
4747
}
4848

49-
// Name is required for specific operations (GET, DELETE, PATCH)
50-
// CREATE, LIST, WATCH, DELETECOLLECTION, APPLY can have empty names
51-
if (r.Verb == "GET" || r.Verb == "DELETE" || r.Verb == "PATCH") && r.Name == "" {
49+
// Name is required for operations targeting a specific object
50+
// CREATE, LIST, WATCH, DELETECOLLECTION can have empty names
51+
if (r.Verb == "GET" || r.Verb == "DELETE" || r.Verb == "PATCH" || r.Verb == "APPLY") && r.Name == "" {
5252
return fmt.Errorf("name is required for %s operation", r.Verb)
5353
}
5454

api/types/replay_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,17 @@ func TestReplayRequestValidate(t *testing.T) {
112112
},
113113
wantErr: true,
114114
},
115+
{
116+
name: "missing name for APPLY",
117+
req: ReplayRequest{
118+
Timestamp: 0,
119+
Verb: "APPLY",
120+
ResourceKind: "Pod",
121+
APIPath: "/api/v1/pods",
122+
Body: `{"apiVersion":"v1","kind":"Pod"}`,
123+
},
124+
wantErr: true,
125+
},
115126
}
116127

117128
for _, tt := range tests {

cmd/kperf/commands/replay/root.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
package replay
5+
6+
import "github.com/urfave/cli"
7+
8+
// Command represents replay subcommand.
9+
var Command = cli.Command{
10+
Name: "replay",
11+
Usage: "Replay captured Kubernetes API requests at their recorded timestamps",
12+
Subcommands: []cli.Command{
13+
runCommand,
14+
},
15+
}

cmd/kperf/commands/replay/run.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
package replay
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"os"
11+
"path/filepath"
12+
13+
"github.com/Azure/kperf/api/types"
14+
"github.com/Azure/kperf/cmd/kperf/commands/utils"
15+
"github.com/Azure/kperf/metrics"
16+
"github.com/Azure/kperf/replay"
17+
18+
"github.com/urfave/cli"
19+
)
20+
21+
var runCommand = cli.Command{
22+
Name: "run",
23+
Usage: "Run a replay test from a profile (local mode)",
24+
Flags: []cli.Flag{
25+
cli.StringFlag{
26+
Name: "kubeconfig",
27+
Usage: "Path to the kubeconfig file",
28+
Value: utils.DefaultKubeConfigPath,
29+
},
30+
cli.StringFlag{
31+
Name: "config",
32+
Usage: "Path to the replay profile file (YAML, supports .yaml.gz for gzip compressed)",
33+
Required: true,
34+
},
35+
cli.StringFlag{
36+
Name: "result",
37+
Usage: "Path to the file which stores results (JSON)",
38+
},
39+
cli.BoolFlag{
40+
Name: "raw-data",
41+
Usage: "Include raw latency data in result",
42+
},
43+
},
44+
Action: func(cliCtx *cli.Context) error {
45+
kubeCfgPath := cliCtx.String("kubeconfig")
46+
configPath := cliCtx.String("config")
47+
48+
// Load the replay profile
49+
profile, err := replay.LoadProfile(context.Background(), configPath)
50+
if err != nil {
51+
return fmt.Errorf("failed to load profile: %w", err)
52+
}
53+
54+
fmt.Printf("Loaded replay profile: %s\n", profile.Description)
55+
fmt.Printf(" Total requests: %d\n", len(profile.Requests))
56+
fmt.Printf(" Duration: %dms\n", profile.Duration())
57+
fmt.Printf(" Runner count: %d\n", profile.Spec.RunnerCount)
58+
59+
// Run the replay
60+
result, err := replay.Schedule(context.Background(), kubeCfgPath, profile)
61+
if err != nil {
62+
return fmt.Errorf("failed to run replay: %w", err)
63+
}
64+
65+
// Print summary to stdout
66+
fmt.Printf("\nReplay completed:\n")
67+
fmt.Printf(" Total requests: %d\n", result.TotalRequests)
68+
fmt.Printf(" Requests run: %d\n", result.TotalRun)
69+
fmt.Printf(" Requests failed: %d\n", result.TotalFailed)
70+
fmt.Printf(" Duration: %s\n", result.Duration)
71+
fmt.Printf(" Bytes received: %d\n", result.Aggregated.TotalReceivedBytes)
72+
73+
// Write result to file or stdout
74+
var f *os.File = os.Stdout
75+
outputFilePath := cliCtx.String("result")
76+
if outputFilePath != "" {
77+
outputFileDir := filepath.Dir(outputFilePath)
78+
79+
if _, err := os.Stat(outputFileDir); os.IsNotExist(err) {
80+
if err := os.MkdirAll(outputFileDir, 0750); err != nil {
81+
return fmt.Errorf("failed to create output directory %s: %w", outputFileDir, err)
82+
}
83+
}
84+
85+
f, err = os.Create(outputFilePath)
86+
if err != nil {
87+
return fmt.Errorf("failed to create result file: %w", err)
88+
}
89+
defer f.Close()
90+
}
91+
92+
rawDataFlagIncluded := cliCtx.Bool("raw-data")
93+
94+
// Build report
95+
report := buildReplayReport(result, rawDataFlagIncluded)
96+
97+
// Write JSON
98+
encoder := json.NewEncoder(f)
99+
encoder.SetIndent("", " ")
100+
if err := encoder.Encode(report); err != nil {
101+
return fmt.Errorf("failed to encode result: %w", err)
102+
}
103+
104+
return nil
105+
},
106+
}
107+
108+
// ReplayReport is the output format for replay results.
109+
type ReplayReport struct {
110+
types.RunnerMetricReport
111+
// RunnerCount is the number of runners used.
112+
RunnerCount int `json:"runnerCount"`
113+
// TotalRun is the number of requests actually executed.
114+
TotalRun int `json:"totalRun"`
115+
// TotalFailed is the number of requests that failed.
116+
TotalFailed int `json:"totalFailed"`
117+
}
118+
119+
// buildReplayReport builds a ReplayReport from ScheduleResult.
120+
func buildReplayReport(result *replay.ScheduleResult, includeRawData bool) ReplayReport {
121+
report := ReplayReport{
122+
RunnerMetricReport: types.RunnerMetricReport{
123+
Total: result.TotalRequests,
124+
ErrorStats: metrics.BuildErrorStatsGroupByType(result.Aggregated.Errors),
125+
Duration: result.Duration.String(),
126+
TotalReceivedBytes: result.Aggregated.TotalReceivedBytes,
127+
PercentileLatenciesByURL: map[string][][2]float64{},
128+
},
129+
RunnerCount: len(result.RunnerResults),
130+
TotalRun: result.TotalRun,
131+
TotalFailed: result.TotalFailed,
132+
}
133+
134+
metrics.BuildPercentileLatenciesReport(&report.RunnerMetricReport, result.Aggregated.LatenciesByURL, includeRawData, result.Aggregated.Errors)
135+
136+
return report
137+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
package replay
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/Azure/kperf/api/types"
11+
"github.com/Azure/kperf/replay"
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestBuildReplayReport(t *testing.T) {
16+
result := &replay.ScheduleResult{
17+
RunnerResults: []*replay.RunnerResult{
18+
{Total: 50, RequestsRun: 48, RequestsFailed: 2, Duration: time.Second},
19+
{Total: 50, RequestsRun: 50, RequestsFailed: 0, Duration: time.Second},
20+
},
21+
Duration: 2 * time.Second,
22+
TotalRequests: 100,
23+
TotalRun: 98,
24+
TotalFailed: 2,
25+
Aggregated: types.ResponseStats{
26+
Errors: []types.ResponseError{
27+
{Method: "GET", URL: "/api/v1/pods/p1", Type: types.ResponseErrorTypeHTTP, Code: 500},
28+
},
29+
LatenciesByURL: map[string][]float64{
30+
"GET /api/v1/pods/:name": {0.1, 0.2, 0.3, 0.4},
31+
"LIST /api/v1/namespaces": {0.5, 0.6},
32+
},
33+
TotalReceivedBytes: 5000,
34+
},
35+
}
36+
37+
report := buildReplayReport(result, false)
38+
39+
assert.Equal(t, 100, report.Total)
40+
assert.Equal(t, 2, report.RunnerCount)
41+
assert.Equal(t, 98, report.TotalRun)
42+
assert.Equal(t, 2, report.TotalFailed)
43+
assert.Equal(t, int64(5000), report.TotalReceivedBytes)
44+
assert.NotEmpty(t, report.PercentileLatencies)
45+
assert.Equal(t, 2, len(report.PercentileLatenciesByURL))
46+
47+
// Raw data should not be included when includeRawData is false
48+
assert.Nil(t, report.LatenciesByURL)
49+
assert.Nil(t, report.Errors)
50+
}
51+
52+
func TestBuildReplayReportWithRawData(t *testing.T) {
53+
result := &replay.ScheduleResult{
54+
RunnerResults: []*replay.RunnerResult{},
55+
Duration: time.Second,
56+
Aggregated: types.ResponseStats{
57+
Errors: []types.ResponseError{
58+
{Method: "GET", URL: "/api/v1/pods/p1", Type: types.ResponseErrorTypeHTTP, Code: 404},
59+
},
60+
LatenciesByURL: map[string][]float64{
61+
"GET /api/v1/pods/:name": {0.1},
62+
},
63+
TotalReceivedBytes: 100,
64+
},
65+
}
66+
67+
report := buildReplayReport(result, true)
68+
69+
assert.NotNil(t, report.LatenciesByURL)
70+
assert.NotNil(t, report.Errors)
71+
}

cmd/kperf/commands/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"strconv"
1111

12+
"github.com/Azure/kperf/cmd/kperf/commands/replay"
1213
"github.com/Azure/kperf/cmd/kperf/commands/runner"
1314
"github.com/Azure/kperf/cmd/kperf/commands/runnergroup"
1415
"github.com/Azure/kperf/cmd/kperf/commands/virtualcluster"
@@ -26,6 +27,7 @@ func App() *cli.App {
2627
runner.Command,
2728
runnergroup.Command,
2829
virtualcluster.Command,
30+
replay.Command,
2931
},
3032
Flags: []cli.Flag{
3133
cli.StringFlag{

cmd/kperf/commands/runner/runner.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/Azure/kperf/api/types"
1616
"github.com/Azure/kperf/cmd/kperf/commands/utils"
1717
"github.com/Azure/kperf/metrics"
18+
"github.com/Azure/kperf/replay"
1819
"github.com/Azure/kperf/request"
1920
"k8s.io/client-go/rest"
2021
"k8s.io/klog/v2"
@@ -29,6 +30,7 @@ var Command = cli.Command{
2930
Usage: "Setup benchmark to kube-apiserver from one endpoint",
3031
Subcommands: []cli.Command{
3132
runCommand,
33+
replayCommand,
3234
},
3335
}
3436

@@ -332,3 +334,107 @@ func buildRunnerMetricReport(stats *request.Result, includeRawData bool) types.R
332334

333335
return output
334336
}
337+
338+
var replayCommand = cli.Command{
339+
Name: "replay",
340+
Usage: "Run a replay test (for distributed mode - used by runner pods)",
341+
Flags: []cli.Flag{
342+
cli.StringFlag{
343+
Name: "kubeconfig",
344+
Usage: "Path to the kubeconfig file",
345+
Value: utils.DefaultKubeConfigPath,
346+
},
347+
cli.StringFlag{
348+
Name: "config",
349+
Usage: "Path or URL to the replay profile (supports .yaml.gz)",
350+
Required: true,
351+
},
352+
cli.IntFlag{
353+
Name: "runner-index",
354+
Usage: "Runner index (0-based). If not set, reads from JOB_COMPLETION_INDEX env var",
355+
Value: -1,
356+
},
357+
cli.StringFlag{
358+
Name: "result",
359+
Usage: "Path to the file which stores results",
360+
},
361+
cli.BoolFlag{
362+
Name: "raw-data",
363+
Usage: "Include raw latency data in result",
364+
},
365+
},
366+
Action: func(cliCtx *cli.Context) error {
367+
kubeCfgPath := cliCtx.String("kubeconfig")
368+
configPath := cliCtx.String("config")
369+
370+
// Load the replay profile
371+
profile, err := replay.LoadProfile(context.Background(), configPath)
372+
if err != nil {
373+
return fmt.Errorf("failed to load replay profile: %w", err)
374+
}
375+
376+
// Determine runner index
377+
runnerIndex := cliCtx.Int("runner-index")
378+
if runnerIndex < 0 {
379+
runnerIndex = replay.GetRunnerIndex(0)
380+
}
381+
382+
klog.V(2).InfoS("Starting replay runner",
383+
"runnerIndex", runnerIndex,
384+
"runnerCount", profile.Spec.RunnerCount,
385+
"totalRequests", len(profile.Requests),
386+
)
387+
388+
// Run the single runner
389+
result, err := replay.ScheduleSingleRunner(context.Background(), kubeCfgPath, profile, runnerIndex)
390+
if err != nil {
391+
return fmt.Errorf("failed to run replay: %w", err)
392+
}
393+
394+
// Write result
395+
var f *os.File = os.Stdout
396+
outputFilePath := cliCtx.String("result")
397+
if outputFilePath != "" {
398+
outputFileDir := filepath.Dir(outputFilePath)
399+
if _, err := os.Stat(outputFileDir); os.IsNotExist(err) {
400+
if err := os.MkdirAll(outputFileDir, 0750); err != nil {
401+
return fmt.Errorf("failed to create output directory: %w", err)
402+
}
403+
}
404+
405+
f, err = os.Create(outputFilePath)
406+
if err != nil {
407+
return fmt.Errorf("failed to create result file: %w", err)
408+
}
409+
defer f.Close()
410+
}
411+
412+
rawDataFlagIncluded := cliCtx.Bool("raw-data")
413+
414+
// Build report using existing metrics infrastructure
415+
report := buildReplayRunnerReport(result, rawDataFlagIncluded, runnerIndex)
416+
417+
encoder := json.NewEncoder(f)
418+
encoder.SetIndent("", " ")
419+
if err := encoder.Encode(report); err != nil {
420+
return fmt.Errorf("failed to encode result: %w", err)
421+
}
422+
423+
return nil
424+
},
425+
}
426+
427+
// buildReplayRunnerReport builds a RunnerMetricReport from replay.RunnerResult.
428+
func buildReplayRunnerReport(result *replay.RunnerResult, includeRawData bool, runnerIndex int) types.RunnerMetricReport {
429+
report := types.RunnerMetricReport{
430+
Total: result.Total,
431+
Duration: result.Duration.String(),
432+
ErrorStats: metrics.BuildErrorStatsGroupByType(result.ResponseStats.Errors),
433+
TotalReceivedBytes: result.ResponseStats.TotalReceivedBytes,
434+
PercentileLatenciesByURL: map[string][][2]float64{},
435+
}
436+
437+
metrics.BuildPercentileLatenciesReport(&report, result.ResponseStats.LatenciesByURL, includeRawData, result.ResponseStats.Errors)
438+
439+
return report
440+
}

0 commit comments

Comments
 (0)