Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 114 additions & 5 deletions img_tool/cmd/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"sort"
"strings"
"time"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -79,6 +80,10 @@ func DeployProcess(ctx context.Context, args []string) {
flagSet.Usage()
os.Exit(1)
}
if jobs <= 0 {
fmt.Fprintf(os.Stderr, "Error: --jobs must be greater than zero, got %d\n", jobs)
os.Exit(1)
}

if len(requestFiles) == 0 {
fmt.Fprintln(os.Stderr, "Error: at least one --request-file is required")
Expand Down Expand Up @@ -215,10 +220,6 @@ func DeployWithExtras(ctx context.Context, rawRequest []byte, opts DeployOptions
for digest, filePath := range opts.ExplicitLayers {
vfsBuilder = vfsBuilder.WithExplicitLayer(digest, filePath)
}
vfs, err := vfsBuilder.Build()
if err != nil {
return fmt.Errorf("building VFS: %w", err)
}

pushOperations, err := req.PushOperations()
if err != nil {
Expand All @@ -235,6 +236,12 @@ func DeployWithExtras(ctx context.Context, rawRequest []byte, opts DeployOptions
if len(pushOperations) == 0 && len(loadOperations) == 0 && len(registryTagOperations) == 0 {
return fmt.Errorf("no push, load, or registry_tag operations found in deploy manifest")
}
logDeployConfig(req, opts, len(pushOperations), len(loadOperations), len(registryTagOperations))

vfs, err := vfsBuilder.Build()
if err != nil {
return fmt.Errorf("building VFS: %w", err)
}

// check if any operation requires a blob cache endpoint
var blobcacheClient blobcache.BlobsClient
Expand All @@ -257,7 +264,14 @@ func DeployWithExtras(ctx context.Context, rawRequest []byte, opts DeployOptions
var pusher *remote.Pusher
needsPusher := len(pushOperations) > 0 || len(registryTagOperations) > 0
if needsPusher && req.Settings.PushStrategy != "bes" {
pusher, err = remote.NewPusher(registry.WithAuthFromMultiKeychain(), remote.WithJobs(opts.Jobs))
progressUpdates := make(chan registryv1.Update, 1024)
waitForProgressLogger := startPushProgressLogger(progressUpdates)
defer func() {
close(progressUpdates)
waitForProgressLogger()
}()

pusher, err = remote.NewPusher(registry.WithAuthFromMultiKeychain(), remote.WithJobs(opts.Jobs), remote.WithProgress(progressUpdates))
if err != nil {
return fmt.Errorf("creating pusher: %w", err)
}
Expand Down Expand Up @@ -337,6 +351,101 @@ func DeployWithExtras(ctx context.Context, rawRequest []byte, opts DeployOptions
return nil
}

const pushProgressLogInterval = time.Minute

func logDeployConfig(req api.DeployManifest, opts DeployOptions, pushOps int, loadOps int, registryTagOps int) {
fmt.Fprintf(os.Stderr, "deploy config: jobs=%d push_strategy=%s load_strategy=%s push_ops=%d load_ops=%d registry_tag_ops=%d\n", opts.Jobs, req.Settings.PushStrategy, req.Settings.LoadStrategy, pushOps, loadOps, registryTagOps)
if opts.OverrideRegistry != "" {
fmt.Fprintf(os.Stderr, "deploy config: override_registry=%s\n", opts.OverrideRegistry)
}
if opts.OverrideRepository != "" {
fmt.Fprintf(os.Stderr, "deploy config: override_repository=%s\n", opts.OverrideRepository)
}
if len(opts.AdditionalTags) > 0 {
fmt.Fprintf(os.Stderr, "deploy config: additional_tags=%s\n", strings.Join(opts.AdditionalTags, ","))
}
}

func startPushProgressLogger(updates <-chan registryv1.Update) func() {
return startPushProgressLoggerWithInterval(updates, pushProgressLogInterval)
}

func startPushProgressLoggerWithInterval(updates <-chan registryv1.Update, interval time.Duration) func() {
done := make(chan struct{})
go func() {
defer close(done)

ticker := time.NewTicker(interval)
defer ticker.Stop()

var latest registryv1.Update
haveLatest := false
var lastLoggedComplete int64 = -1
var lastLoggedTotal int64 = -1

logLatest := func(final bool) {
if !haveLatest {
return
}
if latest.Complete == lastLoggedComplete && latest.Total == lastLoggedTotal && !final {
return
}
lastLoggedComplete = latest.Complete
lastLoggedTotal = latest.Total

if latest.Total > 0 {
percent := float64(latest.Complete) * 100 / float64(latest.Total)
status := "progress"
if final {
status = "complete"
}
fmt.Fprintf(os.Stderr, "push %s: %s/%s (%.1f%%)\n", status, formatByteCount(latest.Complete), formatByteCount(latest.Total), percent)
return
}
status := "progress"
if final {
status = "complete"
}
fmt.Fprintf(os.Stderr, "push %s: %s transferred\n", status, formatByteCount(latest.Complete))
}

for {
select {
case update, ok := <-updates:
if !ok {
logLatest(true)
return
}
if update.Error != nil {
fmt.Fprintf(os.Stderr, "push progress error: %v\n", update.Error)
continue
}
latest = update
haveLatest = true
case <-ticker.C:
logLatest(false)
}
}
}()
return func() {
<-done
}
}

func formatByteCount(bytes int64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div := int64(unit)
exp := 0
for n := bytes / unit; n >= unit && exp < len("KMGTPE")-1; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB", float64(bytes)/float64(div), "KMGTPE"[exp])
}

// applyRegistryTagOperations writes the pre-expanded tags from registry_tag
// ops onto manifests already pushed by a preceding push op. Under the `bes`
// strategy the BES syncer is responsible for this, so we no-op.
Expand Down
16 changes: 13 additions & 3 deletions img_tool/pkg/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"slices"
"sort"

Expand All @@ -24,7 +25,7 @@ const defaultJobs = 16
type builder struct {
blobcacheClient blobcache.BlobsClient
vfs vfs
pusher *remote.Pusher
pusher pusher
overrideRegistry string
overrideRepository string
extraTags []string
Expand Down Expand Up @@ -89,7 +90,7 @@ func (b *builder) Build() *uploader {
type uploader struct {
blobcacheClient blobcache.BlobsClient
vfs vfs
pusher *remote.Pusher
pusher pusher
overrideRegistry string
overrideRepository string
extraTags []string
Expand Down Expand Up @@ -144,16 +145,21 @@ func (u *uploader) PushAll(ctx context.Context, ops []api.IndexedPushDeployOpera
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(u.jobs)

fmt.Fprintf(os.Stderr, "push: starting %d refs with jobs=%d\n", len(items), u.jobs)
for _, item := range items {
item := item
g.Go(func() error {
return pusher.Push(ctx, item.ref, item.taggable)
if err := pusher.Push(ctx, item.ref, item.taggable); err != nil {
return fmt.Errorf("pushing %s: %w", item.ref.String(), err)
}
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}
fmt.Fprintf(os.Stderr, "push: completed %d refs\n", len(items))

return allTags, nil
}
Expand Down Expand Up @@ -243,6 +249,10 @@ type vfs interface {
SizeOf(digest registryv1.Hash) (int64, error)
}

type pusher interface {
Push(context.Context, name.Reference, remote.Taggable) error
}

func deduplicateAndSort(tags []string) []string {
if len(tags) == 0 {
return tags
Expand Down
Loading