Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/control/cmd/dmg/storage_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// (C) Copyright 2019-2022 Intel Corporation.
// (C) Copyright 2026 Hewlett Packard Enterprise Development LP
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -154,6 +155,24 @@ func TestStorageCommands(t *testing.T) {
printRequest(t, nvmeAddDeviceReq().WithStorageTierIndex(0)),
nil,
},
{
"Format with replace; no hosts in hostlist",
"storage format --replace",
"",
errors.New("expects a single host"),
},
{
"Format with replace; multiple hosts in hostlist",
"storage format --replace -l foo[1,2].com",
"",
errors.New("expects a single host"),
},
{
"Format with replace and force",
"storage format --replace --force",
"",
errors.New("may not be mixed with --force"),
},
{
"Nonexistent subcommand",
"storage quack",
Expand Down
15 changes: 5 additions & 10 deletions src/control/server/ctl_firmware_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// (C) Copyright 2020-2024 Intel Corporation.
// (C) Copyright 2026 Hewlett Packard Enterprise Development LP
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -15,10 +16,8 @@ import (
"github.com/daos-stack/daos/src/control/common/proto/convert"
ctlpb "github.com/daos-stack/daos/src/control/common/proto/ctl"
"github.com/daos-stack/daos/src/control/common/test"
"github.com/daos-stack/daos/src/control/lib/ranklist"
"github.com/daos-stack/daos/src/control/logging"
"github.com/daos-stack/daos/src/control/server/config"
"github.com/daos-stack/daos/src/control/server/engine"
"github.com/daos-stack/daos/src/control/server/storage"
"github.com/daos-stack/daos/src/control/server/storage/bdev"
"github.com/daos-stack/daos/src/control/server/storage/scm"
Expand Down Expand Up @@ -803,14 +802,10 @@ func TestCtlSvc_FirmwareUpdate(t *testing.T) {
cfg := config.DefaultServer()
cs := mockControlService(t, log, cfg, tc.bmbc, tc.smbc, nil)
for i := 0; i < 2; i++ {
rCfg := new(engine.TestRunnerConfig)
rCfg.Running.Store(tc.enginesRunning)
runner := engine.NewTestRunner(rCfg, engine.MockConfig())
instance := NewEngineInstance(log, nil, nil, runner, nil)
if !tc.noRankEngines {
instance._superblock = &Superblock{}
instance._superblock.ValidRank = true
instance._superblock.Rank = ranklist.NewRankPtr(uint32(i))
instance := NewEngineInstance(log, nil, nil, nil, nil)
setupTestEngine(t, instance, uint32(i), !tc.enginesRunning)
if tc.noRankEngines {
instance._superblock = nil
}
if err := cs.harness.AddInstance(instance); err != nil {
t.Fatal(err)
Expand Down
4 changes: 3 additions & 1 deletion src/control/server/ctl_ranks_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ const (
instanceUpdateDelay = 500 * time.Millisecond
)

type pollValidateFn func(Engine) bool

// pollInstanceState waits for either context to be cancelled/timeout or for the
// provided validate function to return true for each of the provided instances.
//
// Returns true if all instances return true from the validate function, false
// if context is cancelled before.
func pollInstanceState(ctx context.Context, instances []Engine, validate func(Engine) bool) error {
func pollInstanceState(ctx context.Context, instances []Engine, validate pollValidateFn) error {
ready := make(chan struct{})
go func() {
for {
Expand Down
11 changes: 0 additions & 11 deletions src/control/server/ctl_ranks_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,6 @@ func checkUnorderedRankResults(t *testing.T, expResults, gotResults []*sharedpb.
}
}

func setupTestEngine(t *testing.T, ei *EngineInstance, rank uint32, stopped ...bool) {
ei._superblock.Rank = ranklist.NewRankPtr(rank)

trc := &engine.TestRunnerConfig{}
if len(stopped) == 0 || !stopped[0] {
trc.Running.SetTrue()
ei.ready.SetTrue()
}
ei.runner = engine.NewTestRunner(trc, engine.MockConfig())
}

func TestServer_CtlSvc_PrepShutdownRanks(t *testing.T) {
for name, tc := range map[string]struct {
missingSB bool
Expand Down
23 changes: 12 additions & 11 deletions src/control/server/ctl_storage_rpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//
// (C) Copyright 2019-2024 Intel Corporation.
// (C) Copyright 2025 Hewlett Packard Enterprise Development LP
// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
// (C) Copyright 2025 Google LLC
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
Expand Down Expand Up @@ -856,20 +856,20 @@ type formatScmReq struct {
}

func formatScm(ctx context.Context, req formatScmReq, resp *ctlpb.StorageFormatResp) (map[int]string, map[int]bool, error) {
needFormat := make(map[int]bool)
needScmFormat := make(map[int]bool)
emptyTmpfs := make(map[int]bool)
scmCfgs := make(map[int]*storage.TierConfig)
allNeedFormat := true
allNeedScmFormat := true

for idx, ei := range req.instances {
needs, err := ei.GetStorage().ScmNeedsFormat()
if err != nil {
return nil, nil, errors.Wrap(err, "detecting if SCM format is needed")
}
if needs {
needFormat[idx] = true
needScmFormat[idx] = true
} else {
allNeedFormat = false
allNeedScmFormat = false
}

scmCfg, err := ei.GetStorage().GetScmConfig()
Expand All @@ -882,19 +882,20 @@ func formatScm(ctx context.Context, req formatScmReq, resp *ctlpb.StorageFormatR
if scmCfg.Class == storage.ClassRam && !needs {
info, err := ei.GetStorage().GetScmUsage()
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to check SCM usage for instance %d", idx)
return nil, nil, errors.Wrapf(err,
"failed to check SCM usage for instance %d", idx)
}
emptyTmpfs[idx] = info.TotalBytes-info.AvailBytes == 0
}
}

if req.replace && len(needFormat) == 0 {
if req.replace && len(needScmFormat) == 0 {
// Only valid if at least one engine requires format.
return nil, nil, errors.New("format replace option only valid if at " +
"least one engine requires format but no engines need format")
return nil, nil, errors.New("format replace option only valid if at least one " +
"engine requires scm-format but currently no engines need scm-format")
}

if allNeedFormat {
if allNeedScmFormat {
// Check available RAM is sufficient before formatting SCM on engines.
if err := checkTmpfsMem(req.log, scmCfgs, req.getSysMemInfo); err != nil {
return nil, nil, err
Expand All @@ -907,7 +908,7 @@ func formatScm(ctx context.Context, req formatScmReq, resp *ctlpb.StorageFormatR
formatting := 0

for idx, ei := range req.instances {
if needFormat[idx] || req.reformat {
if needScmFormat[idx] || req.reformat {
formatting++
go func(e Engine) {
scmChan <- e.StorageFormatSCM(ctx, req.reformat)
Expand Down
2 changes: 1 addition & 1 deletion src/control/server/ctl_storage_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2360,7 +2360,7 @@ func TestServer_CtlSvc_StorageFormat(t *testing.T) {
},
},
},
expErr: errors.New("only valid if at least one engine requires format"),
expErr: errors.New("only valid if at least one engine requires scm-format"),
expResp: &ctlpb.StorageFormatResp{
Crets: []*ctlpb.NvmeControllerResult{
{
Expand Down
14 changes: 2 additions & 12 deletions src/control/server/ctl_svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/daos-stack/daos/src/control/common/test"
"github.com/daos-stack/daos/src/control/events"
"github.com/daos-stack/daos/src/control/lib/ranklist"
"github.com/daos-stack/daos/src/control/logging"
"github.com/daos-stack/daos/src/control/provider/system"
"github.com/daos-stack/daos/src/control/server/config"
Expand Down Expand Up @@ -72,19 +71,10 @@ func newMockControlServiceFromBackends(t *testing.T, log logging.Logger, cfg *co
}

for idx, ec := range cfg.Engines {
trc := new(engine.TestRunnerConfig)
if started[idx] {
trc.Running.SetTrue()
}
runner := engine.NewTestRunner(trc, ec)
storProv := storage.MockProvider(log, 0, &ec.Storage, syp, sp, bp, nil)

ei := NewEngineInstance(log, storProv, nil, runner, nil)
ei.setSuperblock(&Superblock{
Rank: ranklist.NewRankPtr(uint32(idx)),
})
ei := NewEngineInstance(log, storProv, nil, nil, nil)
setupTestEngineWithConfig(t, ei, uint32(idx), ec, !started[idx])
if started[idx] {
ei.ready.SetTrue()
ei.setDrpcSocket("/dontcare")
}
if err := cs.harness.AddInstance(ei); err != nil {
Expand Down
20 changes: 16 additions & 4 deletions src/control/server/instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//
// (C) Copyright 2019-2024 Intel Corporation.
// (C) Copyright 2025 Hewlett Packard Enterprise Development LP
// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -216,9 +216,24 @@ func (ei *EngineInstance) determineRank(ctx context.Context, ready *srvpb.Notify
Replace: ei.replaceRank.Load(),
}

// Reset replaceRank state for instance after joinSystem() has been attempted.
defer ei.replaceRank.SetFalse()

resp, err := ei.joinSystem(ctx, joinReq)
if err != nil {
ei.log.Errorf("join failed: %s", err)

// If this is a replace operation and join failed, clean up the formatted storage to
// prevent leaving the rank in a formatted state. This prevents the engine
// inadvertently being joined later with a new rank.
if ei.replaceRank.Load() {
ei.log.Infof("cleaning up after join failure during replace")
if cleanupErr := ei.cleanupFailedJoinReplace(ctx); cleanupErr != nil {
ei.log.Errorf("failed to cleanup after join failure: %v", cleanupErr)
// Don't override the original join error
}
}

return ranklist.NilRank, false, 0, err
}
switch resp.State {
Expand All @@ -237,9 +252,6 @@ func (ei *EngineInstance) determineRank(ctx context.Context, ready *srvpb.Notify
}
r = ranklist.Rank(resp.Rank)

// Reset replaceRank state for instance after joinSystem() has returned.
ei.replaceRank.SetFalse()

if !superblock.ValidRank || ready.Uri != superblock.URI {
ei.log.Noticef("updating rank %d URI to %s", resp.Rank, ready.Uri)
superblock.Rank = new(ranklist.Rank)
Expand Down
16 changes: 5 additions & 11 deletions src/control/server/instance_drpc_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//
// (C) Copyright 2020-2024 Intel Corporation.
// (C) Copyright 2025 Hewlett Packard Enterprise Development LP
// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -26,7 +26,6 @@ import (
"github.com/daos-stack/daos/src/control/lib/daos"
. "github.com/daos-stack/daos/src/control/lib/ranklist"
"github.com/daos-stack/daos/src/control/logging"
"github.com/daos-stack/daos/src/control/server/engine"
. "github.com/daos-stack/daos/src/control/system"
)

Expand Down Expand Up @@ -90,10 +89,8 @@ func TestEngineInstance_CallDrpc(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)

trc := engine.TestRunnerConfig{}
trc.Running.Store(!tc.notStarted)
runner := engine.NewTestRunner(&trc, engine.MockConfig())
instance := NewEngineInstance(log, nil, nil, runner, nil)
instance := NewEngineInstance(log, nil, nil, nil, nil)
setupTestEngine(t, instance, 0, tc.notStarted)
instance.ready.Store(!tc.notReady)

if !tc.noSocket {
Expand Down Expand Up @@ -188,11 +185,8 @@ func TestEngineInstance_CallDrpc_Parallel(t *testing.T) {
}(t)

t.Log("setting up engine...")
trc := engine.TestRunnerConfig{}
trc.Running.Store(true)
runner := engine.NewTestRunner(&trc, engine.MockConfig())
instance := NewEngineInstance(log, nil, nil, runner, nil)
instance.ready.Store(true)
instance := NewEngineInstance(log, nil, nil, nil, nil)
setupTestEngine(t, instance, 0)

instance.getDrpcClientFn = func(s string) drpc.DomainSocketClient {
t.Log("fetching drpc client")
Expand Down
58 changes: 58 additions & 0 deletions src/control/server/instance_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"os"
"syscall"

"github.com/dustin/go-humanize"
"github.com/pkg/errors"
Expand Down Expand Up @@ -76,6 +77,63 @@ func (ei *EngineInstance) NotifyStorageReady(replaceRank bool) {
}()
}

func (ei *EngineInstance) clearFormat(ctx context.Context, stopEngineFn func(context.Context, *EngineInstance) error) error {
idx := ei.Index()
ei.log.Infof("instance %d: cleaning up after join failure during replace", idx)

storageProv := ei.GetStorage()

// Get SCM config to access mount point and class
scmCfg, err := storageProv.GetScmConfig()
if err != nil {
return errors.Wrap(err, "failed to get SCM config")
}

if scmCfg == nil {
ei.log.Debugf("instance %d: no SCM config, nothing to clean", idx)
return nil
}

if ei.IsStarted() {
ei.log.Infof("instance %d: stopping engine before cleanup", idx)
if err := stopEngineFn(ctx, ei); err != nil {
return err
}
ei.log.Debugf("instance %d: engine stopped successfully", idx)
}

// On RAM-based SCM (tmpfs) unmount here unnecessary as will be done on engine exit

// Removing superblock prevents subsequent join without reformat.
if err := ei.RemoveSuperblock(); err != nil {
return err
}

ei.log.Infof("instance %d: cleanup after join failure complete", idx)
return nil
}

// Production implementation of stopEngineFn.
func stopEngine(ctx context.Context, ei *EngineInstance) error {
if err := ei.Stop(syscall.SIGKILL); err != nil {
return errors.Wrap(err, "failed to stop engine")
}

pollFn := func(e Engine) bool { return !e.IsStarted() }
if err := pollInstanceState(ctx, []Engine{ei}, pollFn); err != nil {
return errors.Wrap(err, "waiting for engine to stop")
}

return nil
}

// cleanupFailedJoinReplace cleans up storage after a join failure during replace operation.
// This is called when format succeeded but the join to the system failed, leaving
// the storage in a partially initialized state.
func (ei *EngineInstance) cleanupFailedJoinReplace(ctx context.Context) error {
return ei.clearFormat(ctx, stopEngine)
}

func (ei *EngineInstance) checkScmNeedFormat() (bool, error) {
msgIdx := fmt.Sprintf("instance %d", ei.Index())

Expand Down
Loading
Loading