Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#79](https://github.com/thanos-io/objstore/pull/79) Metrics: Fix `objstore_bucket_operation_duration_seconds` for `iter` operations.

### Added
- [#256](https://github.com/thanos-io/objstore/pull/256) Add `WithStartAfter` `IterOption` for listing objects after a given key. Supported by S3, GCS, BOS, COS, OSS, OBS, OCI, and Swift providers.
- [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support.
- [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class.
- [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials.
Expand Down
5 changes: 4 additions & 1 deletion inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (b *InMemBucket) genericIter(_ context.Context, dir string, f func(string,
})

for _, k := range keys {
if params.StartAfter != "" && k <= params.StartAfter {
continue
}
var modifiedTS time.Time
if params.LastModified {
modifiedTS = lastModified[k]
Expand All @@ -145,7 +148,7 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error,
}

func (i *InMemBucket) SupportedIterOptions() []IterOptionType {
return []IterOptionType{Recursive, UpdatedAt}
return []IterOptionType{Recursive, UpdatedAt, StartAfter}
}

func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error {
Expand Down
61 changes: 61 additions & 0 deletions inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,67 @@ import (
"github.com/efficientgo/core/testutil"
)

func TestInMem_StartAfter(t *testing.T) {
ctx := context.Background()
b := NewInMemBucket()

// Upload objects with known keys.
testutil.Ok(t, b.Upload(ctx, "a/file1.txt", strings.NewReader("data1")))
testutil.Ok(t, b.Upload(ctx, "b/file2.txt", strings.NewReader("data2")))
testutil.Ok(t, b.Upload(ctx, "c/file3.txt", strings.NewReader("data3")))
testutil.Ok(t, b.Upload(ctx, "d/file4.txt", strings.NewReader("data4")))

t.Run("recursive", func(t *testing.T) {
var items []string
testutil.Ok(t, b.Iter(ctx, "", func(name string) error {
items = append(items, name)
return nil
}, WithRecursiveIter(), WithStartAfter("b/file2.txt")))

testutil.Equals(t, []string{"c/file3.txt", "d/file4.txt"}, items)
})

t.Run("non-recursive", func(t *testing.T) {
var items []string
testutil.Ok(t, b.Iter(ctx, "", func(name string) error {
items = append(items, name)
return nil
}, WithStartAfter("b/")))

testutil.Equals(t, []string{"c/", "d/"}, items)
})

t.Run("start_after_last_key", func(t *testing.T) {
var items []string
testutil.Ok(t, b.Iter(ctx, "", func(name string) error {
items = append(items, name)
return nil
}, WithRecursiveIter(), WithStartAfter("d/file4.txt")))

testutil.Equals(t, 0, len(items))
})

t.Run("start_after_empty_string", func(t *testing.T) {
var items []string
testutil.Ok(t, b.Iter(ctx, "", func(name string) error {
items = append(items, name)
return nil
}, WithRecursiveIter(), WithStartAfter("")))

testutil.Equals(t, []string{"a/file1.txt", "b/file2.txt", "c/file3.txt", "d/file4.txt"}, items)
})
}

func TestInMem_StartAfter_UnsupportedProvider(t *testing.T) {
// Validate that passing StartAfter to a provider that doesn't support it returns an error.
err := ValidateIterOptions(
[]IterOptionType{Recursive},
WithStartAfter("foo"),
)
testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), "iter option is not supported"), "expected ErrOptionNotSupported")
}

func TestInMem_ReturnsModifiedInIterAttributes(t *testing.T) {
b := NewInMemBucket()
testutil.Ok(t, b.Upload(context.Background(), "test/file1.txt", strings.NewReader("test-data1")))
Expand Down
15 changes: 15 additions & 0 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type IterOptionType int
const (
Recursive IterOptionType = iota
UpdatedAt
StartAfter
)

// IterOption configures the provided params.
Expand Down Expand Up @@ -176,6 +177,20 @@ func WithUpdatedAt() IterOption {
type IterParams struct {
Recursive bool
LastModified bool
StartAfter string
}

// WithStartAfter is an option that can be applied to Iter() to only list objects
// with keys lexicographically after the specified key.
// Supported by: s3, gcs, bos, cos, oss, obs, oci, swift.
// Note: GCS (StartOffset) and OCI (Start) use inclusive (>=) semantics; all others are exclusive (>).
func WithStartAfter(startAfter string) IterOption {
return IterOption{
Type: StartAfter,
Apply: func(params *IterParams) {
params.StartAfter = startAfter
},
}
}

func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error {
Expand Down
4 changes: 2 additions & 2 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMetricBucket_Close(t *testing.T) {
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.metrics.opsDuration))

AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr))
testutil.Equals(t, float64(9), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(11), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpGetRange)))
Expand All @@ -52,7 +52,7 @@ func TestMetricBucket_Close(t *testing.T) {
// Clear bucket, but don't clear metrics to ensure we use same.
bkt.bkt = NewInMemBucket()
AcceptanceTest(t, bkt)
testutil.Equals(t, float64(18), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(22), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.metrics.ops.WithLabelValues(OpGetRange)))
Expand Down
22 changes: 20 additions & 2 deletions prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er

return p.bkt.Iter(ctx, pdir, func(s string) error {
return f(strings.TrimPrefix(s, p.prefix+DirDelim))
}, options...)
}, p.prefixIterOptions(options)...)
}

func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
Expand All @@ -62,7 +62,25 @@ func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f f
return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error {
attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim)
return f(attrs)
}, options...)
}, p.prefixIterOptions(options)...)
}

// prefixIterOptions adjusts any StartAfter option to include the bucket prefix.
func (p *PrefixedBucket) prefixIterOptions(options []IterOption) []IterOption {
params := ApplyIterOptions(options...)
if params.StartAfter == "" {
return options
}

adjusted := make([]IterOption, 0, len(options))
for _, opt := range options {
if opt.Type == StartAfter {
adjusted = append(adjusted, WithStartAfter(withPrefix(p.prefix, params.StartAfter)))
} else {
adjusted = append(adjusted, opt)
}
}
return adjusted
}

func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType {
Expand Down
10 changes: 5 additions & 5 deletions providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader, opts ...obj
}

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt, objstore.StartAfter}
}

func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
Expand All @@ -199,7 +199,7 @@ func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attr
delimiter = ""
}

var marker string
marker := params.StartAfter
for {
if err := ctx.Err(); err != nil {
return err
Expand Down Expand Up @@ -249,12 +249,12 @@ func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attr
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error {
// Only include recursive option since attributes are not used in this method.
// Only include supported options since attributes are not used in this method.
var filteredOpts []objstore.IterOption
for _, opt := range opts {
if opt.Type == objstore.Recursive {
switch opt.Type {
case objstore.Recursive, objstore.StartAfter:
filteredOpts = append(filteredOpts, opt)
break
}
}

Expand Down
7 changes: 4 additions & 3 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (b *Bucket) Delete(ctx context.Context, name string) error {
}

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive}
return []objstore.IterOptionType{objstore.Recursive, objstore.StartAfter}
}

// Iter calls f for each entry in the given directory. The argument to f is the full
Expand Down Expand Up @@ -416,14 +416,15 @@ func (b *Bucket) listObjects(ctx context.Context, objectPrefix string, options .
objectsCh := make(chan objectInfo, 1)

// If recursive iteration is enabled we should pass an empty delimiter.
params := objstore.ApplyIterOptions(options...)
delimiter := dirDelim
if objstore.ApplyIterOptions(options...).Recursive {
if params.Recursive {
delimiter = ""
}

go func(objectsCh chan<- objectInfo) {
defer close(objectsCh)
var marker string
marker := params.StartAfter
for {
result, _, err := b.client.Bucket.Get(ctx, &cos.BucketGetOptions{
Prefix: objectPrefix,
Expand Down
13 changes: 7 additions & 6 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (b *Bucket) Name() string {
}

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt, objstore.StartAfter}
}

func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
Expand All @@ -217,8 +217,9 @@ func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attr
}

query := &storage.Query{
Prefix: dir,
Delimiter: delimiter,
Prefix: dir,
Delimiter: delimiter,
StartOffset: appliedOpts.StartAfter,
}
if appliedOpts.LastModified {
if err := query.SetAttrSelection([]string{"Name", "Updated"}); err != nil {
Expand Down Expand Up @@ -257,12 +258,12 @@ func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attr
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error {
// Only include recursive option since attributes are not used in this method.
// Only include supported options since attributes are not used in this method.
var filteredOpts []objstore.IterOption
for _, opt := range opts {
if opt.Type == objstore.Recursive {
switch opt.Type {
case objstore.Recursive, objstore.StartAfter:
filteredOpts = append(filteredOpts, opt)
break
}
}

Expand Down
6 changes: 4 additions & 2 deletions providers/obs/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (b *Bucket) multipartUpload(size int64, key, uploadId string, body io.Reade
func (b *Bucket) Close() error { return nil }

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive}
return []objstore.IterOptionType{objstore.Recursive, objstore.StartAfter}
}

// Iter calls f for each entry in the given directory (not recursive.)
Expand All @@ -255,11 +255,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}

params := objstore.ApplyIterOptions(options...)
input := &obs.ListObjectsInput{}
input.Bucket = b.name
input.Prefix = dir
input.Delimiter = DirDelim
if objstore.ApplyIterOptions(options...).Recursive {
input.Marker = params.StartAfter
if params.Recursive {
input.Delimiter = ""
}
for {
Expand Down
19 changes: 16 additions & 3 deletions providers/oci/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,34 @@ func getObject(ctx context.Context, bkt Bucket, objectName string, byteRange str
}

func listAllObjects(ctx context.Context, bkt Bucket, prefix string, options ...objstore.IterOption) (objectNames []string, err error) {
params := objstore.ApplyIterOptions(options...)

var allObjectNames []string
var nextStartWith *string = nil
var nextStartWith *string
if params.StartAfter != "" {
nextStartWith = &params.StartAfter
}
init := true

// Filter out StartAfter so recursive subdirectory calls don't reapply it.
var subOpts []objstore.IterOption
for _, opt := range options {
if opt.Type != objstore.StartAfter {
subOpts = append(subOpts, opt)
}
}

for init || nextStartWith != nil {
init = false
objectNames, nextStartWith, err = listObjects(ctx, bkt, prefix, nextStartWith)
if err != nil {
return nil, err
}

if objstore.ApplyIterOptions(options...).Recursive {
if params.Recursive {
for _, objectName := range objectNames {
if strings.HasSuffix(objectName, DirDelim) {
subObjectNames, err := listAllObjects(ctx, bkt, objectName, options...)
subObjectNames, err := listAllObjects(ctx, bkt, objectName, subOpts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (b *Bucket) Name() string {
}

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive}
return []objstore.IterOptionType{objstore.Recursive, objstore.StartAfter}
}

// Iter calls f for each entry in the given directory. The argument to f is the full
Expand Down
7 changes: 4 additions & 3 deletions providers/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func validate(config Config) error {
}

func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive}
return []objstore.IterOptionType{objstore.Recursive, objstore.StartAfter}
}

// Iter calls f for each entry in the given directory. The argument to f is the full
Expand All @@ -230,12 +230,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim
}

params := objstore.ApplyIterOptions(options...)
delimiter := alioss.Delimiter(objstore.DirDelim)
if objstore.ApplyIterOptions(options...).Recursive {
if params.Recursive {
delimiter = nil
}

marker := alioss.Marker("")
marker := alioss.Marker(params.StartAfter)
for {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context closed while iterating bucket")
Expand Down
Loading
Loading