Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"path"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/google/uuid"
"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/fs"
"github.com/juicedata/juicefs/pkg/meta"
Expand Down Expand Up @@ -401,6 +403,74 @@ func (j *juiceFS) Readlink(name string) (string, error) {
return string(target), toError(err)
}

func (j *juiceFS) Limits() object.Limits {
return object.Limits{
IsSupportMultipartUpload: true,
IsSupportUploadPartCopy: false,
MinPartSize: 4 << 30,
MaxPartSize: 4 << 30,
MaxPartCount: 10000000,
}
}
Comment thread
zhijian-pro marked this conversation as resolved.
Comment on lines +406 to +414

func (j *juiceFS) CreateMultipartUpload(_ context.Context, key string) (*object.MultipartUpload, error) {
return &object.MultipartUpload{
MinPartSize: j.Limits().MinPartSize,
MaxCount: j.Limits().MaxPartCount,
UploadID: uuid.NewString(),
}, nil
}
Comment thread
zhijian-pro marked this conversation as resolved.
Comment thread
Copilot marked this conversation as resolved.

func (j *juiceFS) partDir(key, uploadID string) string {
name := path.Base(key)
if len(name) > 200 {
name = name[:200]
}
return fmt.Sprintf("%s/.jfs-part-%s-%s/", path.Dir(key), name, uploadID)
}
Comment thread
zhijian-pro marked this conversation as resolved.
Comment on lines +427 to +433

func (j *juiceFS) UploadPartStream(key string, uploadID string, num int, in io.Reader) (*object.Part, error) {
err := j.Put(context.Background(), path.Join(j.partDir(key, uploadID), strconv.Itoa(num)), in)
return &object.Part{
Num: num,
}, err
Comment on lines +435 to +439

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他地方实际不使用jfs

}

func (j *juiceFS) AbortUpload(rCtx context.Context, key string, uploadID string) {
ctx := meta.WrapWithoutCancel(rCtx, pid, uid, []uint32{gid})
_ = j.jfs.Rmr(ctx, j.path(j.partDir(key, uploadID)), true, meta.RmrDefaultThreads)
}

func (j *juiceFS) CompleteUpload(rCtx context.Context, key string, uploadID string, parts []*object.Part) (err error) {
Comment thread
zhijian-pro marked this conversation as resolved.
ctx := meta.WrapWithoutCancel(rCtx, pid, uid, []uint32{gid})
tmp := j.path(path.Join(j.partDir(key, uploadID), "complete"))
f, eno := j.jfs.Create(ctx, tmp, 0666, j.umask)
// CompleteUpload needs to implement idempotence
if errors.Is(eno, syscall.EEXIST) {
_ = j.jfs.Delete(ctx, tmp)
f, eno = j.jfs.Create(ctx, tmp, 0666, j.umask)
}
if eno != 0 {
return toError(eno)
}
Comment thread
zhijian-pro marked this conversation as resolved.
defer f.Close(ctx)
var total uint64
for _, part := range parts {
p := j.path(path.Join(j.partDir(key, uploadID), strconv.Itoa(part.Num)))
copied, eno := j.jfs.CopyFileRange(ctx, p, 0, tmp, total, uint64(j.Limits().MaxPartSize))
if eno != 0 {
return toError(eno)
}
total += copied
}
Comment thread
zhijian-pro marked this conversation as resolved.

eno = j.jfs.Rename(ctx, tmp, j.path(key), 0)
if eno == 0 {
_ = j.jfs.Rmr(ctx, j.path(j.partDir(key, uploadID)), true, meta.RmrDefaultThreads)
}
return toError(eno)
}

func getDefaultChunkConf(format *meta.Format) *chunk.Config {
chunkConf := &chunk.Config{
BlockSize: format.BlockSize * 1024,
Expand Down
Loading