Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions supernode/adaptors/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,28 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action
totalBytesStored := 0
metadataBytesStored := 0
firstBatchProcessed := false
if len(keys) == 0 && len(metadataFiles) > 0 {
logtrace.Info(ctx, "store: batch send (metadata-only)", logtrace.Fields{
"taskID": taskID,
"metadata_count": len(metadataFiles),
"metadata_bytes": metadataBytes,
"metadata_mb_est": utils.BytesIntToMB(metadataBytes),
})
bctx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout)
err = p.p2p.StoreBatch(bctx, metadataFiles, P2PDataRaptorQSymbol, taskID)
cancel()
if err != nil {
return totalSymbols, totalAvailable, fmt.Errorf("p2p store batch (metadata-only): %w", err)
}
logtrace.Info(ctx, "store: batch ok (metadata-only)", logtrace.Fields{
"taskID": taskID,
"metadata_count": len(metadataFiles),
"metadata_bytes": metadataBytes,
})
totalBytesStored += metadataBytes
metadataBytesStored += metadataBytes
firstBatchProcessed = true
}
for start := 0; start < len(keys); {
end := min(start+loadSymbolsBatchSize, len(keys))
batch := keys[start:end]
Expand Down
58 changes: 58 additions & 0 deletions supernode/adaptors/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"testing"

"github.com/LumeraProtocol/supernode/v2/p2p"
p2pmock "github.com/LumeraProtocol/supernode/v2/p2p/mocks"
"github.com/LumeraProtocol/supernode/v2/pkg/codec"
"github.com/LumeraProtocol/supernode/v2/pkg/storage/rqstore"
"github.com/stretchr/testify/mock"
"go.uber.org/mock/gomock"
)

Expand All @@ -18,6 +21,14 @@ type clientWithPeersCount struct {

func (c clientWithPeersCount) PeersCount() int { return c.peers }

type p2pClientWithStreamMock struct {
*p2pmock.Client
}

func (c p2pClientWithStreamMock) BatchRetrieveStream(_ context.Context, _ []string, _ int32, _ string, _ func(string, []byte) error, _ ...bool) (int32, error) {
return 0, nil
}

func TestStoreArtefacts_ZeroPeers_ReturnsError(t *testing.T) {
svc := NewP2PService(clientWithPeersCount{peers: 0}, nil)

Expand Down Expand Up @@ -52,3 +63,50 @@ func TestStoreArtefacts_PeersPresent_DoesNotTripGuard(t *testing.T) {
}
}

func TestStoreCascadeSymbolsAndData_MetadataOnlyBatchWhenNoSymbols(t *testing.T) {
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

store := rqstore.NewMockStore(ctrl)
store.EXPECT().StoreSymbolDirectory("task", "").Return(nil)
store.EXPECT().UpdateIsFirstBatchStored("task").Return(nil)

metadata := [][]byte{[]byte("index-bytes"), []byte("layout-bytes")}
baseClient := p2pmock.NewClient(t)
baseClient.On("StoreBatch", mock.Anything, metadata, P2PDataRaptorQSymbol, "task").Return(nil).Once()

svc := &p2pImpl{p2p: p2pClientWithStreamMock{Client: baseClient}, rqStore: store}
stored, total, err := svc.storeCascadeSymbolsAndData(context.Background(), "task", "action", "", metadata, codec.Layout{Blocks: []codec.Block{{BlockID: 0}}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if stored != 0 {
t.Fatalf("expected 0 stored symbols, got %d", stored)
}
if total != 0 {
t.Fatalf("expected 0 total symbols, got %d", total)
}
}

func TestStoreCascadeSymbolsAndData_MetadataOnlyBatchFailureSkipsFirstBatchFlag(t *testing.T) {
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

store := rqstore.NewMockStore(ctrl)
store.EXPECT().StoreSymbolDirectory("task", "").Return(nil)
store.EXPECT().UpdateIsFirstBatchStored("task").Times(0)

metadata := [][]byte{[]byte("index-bytes")}
baseClient := p2pmock.NewClient(t)
baseClient.On("StoreBatch", mock.Anything, metadata, P2PDataRaptorQSymbol, "task").Return(errors.New("p2p down")).Once()

svc := &p2pImpl{p2p: p2pClientWithStreamMock{Client: baseClient}, rqStore: store}
_, _, err := svc.storeCascadeSymbolsAndData(context.Background(), "task", "action", "", metadata, codec.Layout{Blocks: []codec.Block{{BlockID: 0}}})
if err == nil {
t.Fatalf("expected error, got nil")
}
if !strings.Contains(err.Error(), "metadata-only") {
t.Fatalf("expected metadata-only path error, got: %v", err)
}
}

175 changes: 175 additions & 0 deletions supernode/transport/gateway/recovery_admin_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package gateway

import (
"net/http"
"net/http/httptest"
"testing"
)

func setupRecoveryServer(ra *recoveryAdmin, token string) (*httptest.Server, func()) {
orig := RecoveryAdminToken
RecoveryAdminToken = token

mux := http.NewServeMux()
ra.register(mux)
ts := httptest.NewServer(mux)

cleanup := func() {
ts.Close()
RecoveryAdminToken = orig
}
return ts, cleanup
}

func TestRecoveryAdminIntegration_AuthAndRouting(t *testing.T) {
ra := &recoveryAdmin{
enabled: true,
reseedSem: make(chan struct{}, 1),
statusSem: make(chan struct{}, 1),
}
ts, cleanup := setupRecoveryServer(ra, "secret")
defer cleanup()

t.Run("health unauthorized without header", func(t *testing.T) {
resp, err := http.Get(ts.URL + "/api/v1/recovery/health")
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusUnauthorized)
}
})

t.Run("health authorized returns 200", func(t *testing.T) {
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/v1/recovery/health", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusOK)
}
})

t.Run("action status route missing action id returns 404", func(t *testing.T) {
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/v1/recovery/actions//status", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusNotFound)
}
})
}

func TestRecoveryAdminIntegration_MethodAndDependencyBehavior(t *testing.T) {
ra := &recoveryAdmin{
enabled: true,
reseedSem: make(chan struct{}, 1),
statusSem: make(chan struct{}, 1),
}
ts, cleanup := setupRecoveryServer(ra, "secret")
defer cleanup()

t.Run("reseed rejects GET", func(t *testing.T) {
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/v1/recovery/reseed?action_id=a1", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusMethodNotAllowed {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusMethodNotAllowed)
}
})

t.Run("status rejects POST", func(t *testing.T) {
req, _ := http.NewRequest(http.MethodPost, ts.URL+"/api/v1/recovery/actions/a1/status", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusMethodNotAllowed {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusMethodNotAllowed)
}
})

t.Run("reseed returns 503 when dependencies missing", func(t *testing.T) {
req, _ := http.NewRequest(http.MethodPost, ts.URL+"/api/v1/recovery/reseed?action_id=a1", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusServiceUnavailable {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusServiceUnavailable)
}
})

t.Run("status returns 503 when dependencies missing", func(t *testing.T) {
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/v1/recovery/actions/a1/status", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusServiceUnavailable {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusServiceUnavailable)
}
})
}

func TestRecoveryAdminIntegration_SemaphoreThrottling(t *testing.T) {
ra := &recoveryAdmin{
enabled: true,
cascadeFactory: stubCascadeFactory{},
p2pClient: stubP2PClient{},
reseedSem: make(chan struct{}, 1),
statusSem: make(chan struct{}, 1),
}
ts, cleanup := setupRecoveryServer(ra, "secret")
defer cleanup()

t.Run("reseed returns 429 while in-progress", func(t *testing.T) {
ra.reseedSem <- struct{}{}
defer func() { <-ra.reseedSem }()

req, _ := http.NewRequest(http.MethodPost, ts.URL+"/api/v1/recovery/reseed?action_id=a1", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusTooManyRequests {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusTooManyRequests)
}
})

t.Run("status returns 429 while in-progress", func(t *testing.T) {
ra.statusSem <- struct{}{}
defer func() { <-ra.statusSem }()

req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/v1/recovery/actions/a1/status", nil)
req.Header.Set(recoveryHeaderToken, "secret")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusTooManyRequests {
t.Fatalf("status=%d want=%d", resp.StatusCode, http.StatusTooManyRequests)
}
})
}
Loading