Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type engine interface {
doFindDetachedNodes(t time.Time) []Ino
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno

doScanSustainedInodes(ctx Context, fn func(uid, gid uint32, length uint64) error) error

doGetQuota(ctx Context, qtype uint32, key uint64) (*Quota, error)
// set quota, return true if there is no quota exists before
doSetQuota(ctx Context, qtype uint32, key uint64, quota *Quota) (created bool, err error)
Expand Down
11 changes: 10 additions & 1 deletion pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5546,7 +5546,6 @@ func testHardlinkQuota(t *testing.T, m Meta, ctx Context, parent Ino, uid, gid u
func testSustainedInodeQuotaDecrement(t *testing.T, m Meta, ctx Context, parent Ino, uid, gid uint32) {
format := m.getBase().getFormat()
format.UserGroupQuota = true

uidKey := fmt.Sprintf("%d", uid)
gidKey := fmt.Sprintf("%d", gid)
if err := m.HandleQuota(ctx, QuotaSet, uidKey, UserQuotaType, map[string]*Quota{uidKey: {MaxSpace: 100 << 20, MaxInodes: 100}}, false, false, false); err != nil {
Expand Down Expand Up @@ -5592,6 +5591,16 @@ func testSustainedInodeQuotaDecrement(t *testing.T, m Meta, ctx Context, parent
if st := m.Unlink(ctx, parent, "sustained_inode_quota_file", true); st != 0 {
t.Fatalf("Unlink sustained file: %s", st)
}

uidAfterUnlink := getUsedInodes(UserQuotaType, uidKey)
gidAfterUnlink := getUsedInodes(GroupQuotaType, gidKey)
if uidAfterUnlink != uidBefore+1 {
t.Fatalf("user quota inode should remain increased after unlink (sustained): before=%d after_unlink=%d", uidBefore, uidAfterUnlink)
}
Comment on lines +5599 to +5603

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot add test to cover this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added testSustainedInodeBeforeQuotaEnabled in commit 5ee0acb. The test:

  1. Ensures UserGroupQuota is disabled, then creates+opens+unlinks a file (creating a sustained inode while quota tracking is off)
  2. Calls HandleQuota(QuotaSet) to enable quota for the first time — this triggers ScanUserGroupUsage, which now includes the sustained inode
  3. Asserts UsedInodes > 0 after the scan (the sustained inode is counted)
  4. Calls Close on the inode and asserts UsedInodes >= 0 (no negative — the regression from User/group quota usage can become negative when quota is enabled after sustained inodes already exist #7079)

if gidAfterUnlink != gidBefore+1 {
t.Fatalf("group quota inode should remain increased after unlink (sustained): before=%d after_unlink=%d", gidBefore, gidAfterUnlink)
}

if st := m.Close(ctx, inode); st != 0 {
t.Fatalf("Close sustained file: %s", st)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/meta/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,25 @@ func (m *baseMeta) scanGlobalUserGroupUsage(ctx Context) (map[uint64]*Summary, m

}
}

if err := m.en.doScanSustainedInodes(ctx, func(uid, gid uint32, length uint64) error {
u, g := uint64(uid), uint64(gid)
if userUsage[u] == nil {
userUsage[u] = &Summary{}
}
if groupUsage[g] == nil {
groupUsage[g] = &Summary{}
}
space := align4K(length)
userUsage[u].Size += uint64(space)
userUsage[u].Files++
groupUsage[g].Size += uint64(space)
groupUsage[g].Files++
return nil
}); err != nil {
logger.Warnf("scan sustained inodes for user/group quota: %v", err)
}

return userUsage, groupUsage, nil
}

Expand Down
26 changes: 18 additions & 8 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,15 +842,28 @@ func (m *redisMeta) doSyncVolumeStat(ctx Context, used, inodes int64) error {
if m.conf.ReadOnly {
return syscall.EROFS
}
if err := m.doScanSustainedInodes(ctx, func(uid, gid uint32, length uint64) error {
used += align4K(length)
inodes++
return nil
}); err != nil {
return err
}
logger.Debugf("Used space: %s, inodes: %d", humanize.IBytes(uint64(used)), inodes)
if err := m.rdb.Set(ctx, m.totalInodesKey(), strconv.FormatInt(inodes, 10), 0).Err(); err != nil {
return fmt.Errorf("set total inodes: %s", err)
}
return m.rdb.Set(ctx, m.usedSpaceKey(), strconv.FormatInt(used, 10), 0).Err()
}

func (m *redisMeta) doScanSustainedInodes(ctx Context, fn func(uid, gid uint32, length uint64) error) error {
var inoKeys []string
if err := m.scan(ctx, "session*", func(keys []string) error {
for i := 0; i < len(keys); i += 1 {
key := keys[i]
if key == "sessions" {
continue
}

inodes, err := m.rdb.SMembers(ctx, key).Result()
if err != nil {
logger.Warnf("SMembers %s: %s", key, err)
Expand Down Expand Up @@ -884,16 +897,13 @@ func (m *redisMeta) doSyncVolumeStat(ctx Context, used, inodes int64) error {
for _, v := range values {
if v != nil {
m.parseAttr([]byte(v.(string)), &attr)
used += align4K(attr.Length)
inodes += 1
if err := fn(attr.Uid, attr.Gid, attr.Length); err != nil {
return err
}
}
}
}
logger.Debugf("Used space: %s, inodes: %d", humanize.IBytes(uint64(used)), inodes)
if err := m.rdb.Set(ctx, m.totalInodesKey(), strconv.FormatInt(inodes, 10), 0).Err(); err != nil {
return fmt.Errorf("set total inodes: %s", err)
}
return m.rdb.Set(ctx, m.usedSpaceKey(), strconv.FormatInt(used, 10), 0).Err()
return nil
}

// redisMeta updates the usage in each transaction
Expand Down
51 changes: 35 additions & 16 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ func (m *dbMeta) sqlConv(sql string) string {
}

func (m *dbMeta) initStatement() {
m.statement["SELECT length FROM node WHERE inode IN (SELECT inode FROM sustained)"] =
fmt.Sprintf("SELECT length FROM %snode WHERE inode IN (SELECT inode FROM %ssustained)", m.tablePrefix, m.tablePrefix)
m.statement["SELECT n.uid, n.gid, n.length FROM node n WHERE n.inode IN (SELECT inode FROM sustained)"] =
fmt.Sprintf("SELECT n.uid, n.gid, n.length FROM %snode n WHERE n.inode IN (SELECT inode FROM %ssustained)", m.tablePrefix, m.tablePrefix)
m.statement["update counter set value=value + ? where name='totalInodes'"] =
fmt.Sprintf("update %scounter set value=value + ? where name='totalInodes'", m.tablePrefix)
m.statement["update counter set value= value + ? where name='usedSpace'"] =
Expand Down Expand Up @@ -1417,20 +1417,9 @@ func (m *dbMeta) doSyncVolumeStat(ctx Context, used, inodes int64) error {
if m.conf.ReadOnly {
return syscall.EROFS
}
if err := m.simpleTxn(ctx, func(s *xorm.Session) error {
queryResultMap, err := s.QueryString(m.sqlConv("SELECT length FROM node WHERE inode IN (SELECT inode FROM sustained)"))
if err != nil {
return err
}
for _, v := range queryResultMap {
value, err := strconv.ParseInt(v["length"], 10, 64)
if err != nil {
logger.Warnf("parse sustained length: %s err: %s", v["length"], err)
continue
}
used += align4K(uint64(value))
inodes += 1
}
if err := m.doScanSustainedInodes(ctx, func(uid, gid uint32, length uint64) error {
used += align4K(length)
inodes++
return nil
}); err != nil {
return err
Expand Down Expand Up @@ -3268,6 +3257,36 @@ func (m *dbMeta) doRefreshSession() error {
})
}

func (m *dbMeta) doScanSustainedInodes(ctx Context, fn func(uid, gid uint32, length uint64) error) error {
return m.simpleTxn(ctx, func(s *xorm.Session) error {
rows, err := s.QueryString(m.sqlConv("SELECT n.uid, n.gid, n.length FROM node n WHERE n.inode IN (SELECT inode FROM sustained)"))
if err != nil {
return err
}
for _, row := range rows {
uid, err := strconv.ParseUint(row["uid"], 10, 32)
if err != nil {
logger.Warnf("parse sustained uid: %v", err)
continue
}
gid, err := strconv.ParseUint(row["gid"], 10, 32)
if err != nil {
logger.Warnf("parse sustained gid: %v", err)
continue
}
length, err := strconv.ParseUint(row["length"], 10, 64)
if err != nil {
logger.Warnf("parse sustained length: %v", err)
continue
}
if err := fn(uint32(uid), uint32(gid), length); err != nil {
return err
}
}
return nil
})
}

func (m *dbMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
var n = node{Inode: inode}
var newSpace int64
Expand Down
51 changes: 31 additions & 20 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,31 @@ func (m *kvMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry
return 0
}

func (m *kvMeta) doScanSustainedInodes(ctx Context, fn func(uid, gid uint32, length uint64) error) error {
vals, err := m.scanKeys(ctx, m.fmtKey("SS"))
if err != nil {
return err
}
var attr Attr
for _, k := range vals {
b := utils.FromBuffer(k[2:])
if b.Len() != 16 {
logger.Warnf("Invalid sustainedKey: %v", k)
continue
}
_ = b.Get64()
inode := m.decodeInode(b.Get(8))
if eno := m.doGetAttr(ctx, inode, &attr); eno != 0 {
logger.Warnf("Get attr of sustained inode %d: %s", inode, eno)
continue
}
if err := fn(attr.Uid, attr.Gid, attr.Length); err != nil {
return err
}
}
return nil
}

func (m *kvMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
var attr Attr
var newSpace int64
Expand Down Expand Up @@ -3596,29 +3621,15 @@ func (m *kvMeta) doSyncVolumeStat(ctx Context, used, inodes int64) error {
if m.conf.ReadOnly {
return syscall.EROFS
}
// need add sustained file size
vals, err := m.scanKeys(ctx, m.fmtKey("SS"))
if err != nil {
if err := m.doScanSustainedInodes(ctx, func(uid, gid uint32, length uint64) error {
used += align4K(length)
inodes++
return nil
}); err != nil {
return err
}
var attr Attr
for _, k := range vals {
b := utils.FromBuffer(k[2:])
if b.Len() != 16 {
logger.Warnf("Invalid sustainedKey: %v", k)
continue
}
_ = b.Get64()
inode := m.decodeInode(b.Get(8))
if eno := m.doGetAttr(ctx, inode, &attr); eno != 0 {
logger.Warnf("Get attr of inode %d: %s", inode, eno)
continue
}
used += align4K(attr.Length)
inodes += 1
}
logger.Debugf("Used space: %s, inodes: %d", humanize.IBytes(uint64(used)), inodes)
err = m.setValue(m.counterKey(totalInodes), packCounter(inodes))
err := m.setValue(m.counterKey(totalInodes), packCounter(inodes))
if err != nil {
return fmt.Errorf("set total inodes: %w", err)
}
Expand Down
Loading