Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions services/bump_builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,38 +179,56 @@ func (b *Builder) markMinedAndPublish(ctx context.Context, logger *zap.Logger, b
WithLabelValues(string(prev.Status), string(models.StatusMined)).
Observe(time.Since(prev.Timestamp).Seconds())
}
if len(mined) == 0 {
if len(mined) == 0 || b.publisher == nil {
return
}
// Coalesce the N-per-block MINED fan-out into ONE bulk event. Without
// this, a single BUMP build for a 14k-tx block produced 14k individual
// publish calls, which overran the webhook service's 1024-cap work
// queue and triggered ~185k drops/block. Subscribers (SSE, webhook)
// unfan from the bulk template in their own handlers — they own the
// per-tx delivery cost, but they don't pay the channel-saturation cost.
// Coalesce the N-per-block MINED fan-out into bulk events. Without this, a
// single BUMP build for a 14k-tx block produced 14k individual publish
// calls, which overran the webhook service's 1024-cap work queue and
// triggered ~185k drops/block. Subscribers (SSE, webhook) unfan from the
// bulk template in their own handlers.
//
// Chunk the txids so one event never exceeds the Kafka producer's max
// message size: a ~27k-tx block serialized to ~1.85 MB, over the 1 MiB
// default Producer.MaxMessageBytes, so PublishBulk failed and the MINED
// event was silently dropped for large blocks (the DB status was still
// MINED, but SSE/webhook subscribers never saw it). Each txid is ~67 bytes
// of JSON, so maxTxIDsPerBulkEvent keeps every event well under the limit.
publishTxIDs := make([]string, 0, len(mined))
for _, st := range mined {
publishTxIDs = append(publishTxIDs, st.TxID)
}
template := &models.TransactionStatus{
Status: models.StatusMined,
BlockHash: blockHash,
BlockHeight: blockHeight,
Timestamp: time.Now(),
TxIDs: publishTxIDs,
}
if b.publisher != nil {
for start := 0; start < len(publishTxIDs); start += maxTxIDsPerBulkEvent {
end := start + maxTxIDsPerBulkEvent
if end > len(publishTxIDs) {
end = len(publishTxIDs)
}
template := &models.TransactionStatus{
Status: models.StatusMined,
BlockHash: blockHash,
BlockHeight: blockHeight,
Timestamp: time.Now(),
TxIDs: publishTxIDs[start:end],
}
if pubErr := b.publisher.PublishBulk(ctx, template); pubErr != nil {
logger.Warn(
"failed to publish bulk MINED",
zap.String("block_hash", blockHash),
zap.Int("txid_count", len(publishTxIDs)),
zap.Int("chunk_start", start),
zap.Int("chunk_size", end-start),
zap.Int("txid_total", len(publishTxIDs)),
zap.Error(pubErr),
)
}
}
}

// maxTxIDsPerBulkEvent caps how many txids ride in a single bulk MINED event.
// A txid is 64 hex chars (~67 bytes of JSON with quoting + comma); 5000 keeps a
// bulk event around 340 KB, comfortably under the 1 MiB default Kafka
// Producer.MaxMessageBytes even with the status envelope.
const maxTxIDsPerBulkEvent = 5000

func (b *Builder) Name() string { return "bump-builder" }

func (b *Builder) Start(ctx context.Context) error {
Expand Down
73 changes: 73 additions & 0 deletions services/bump_builder/builder_bulk_mined_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package bump_builder

import (
"context"
"errors"
"fmt"
"sync"
"testing"

"go.uber.org/zap"

"github.com/bsv-blockchain/arcade/models"
)

// chunkCapturingPublisher records the TxIDs of each PublishBulk call verbatim
// (without unfanning) so a test can assert on chunk count and per-chunk size.
type chunkCapturingPublisher struct {
mu sync.Mutex
chunks [][]string
}

func (p *chunkCapturingPublisher) Publish(context.Context, *models.TransactionStatus) error {
return nil
}

func (p *chunkCapturingPublisher) PublishBulk(_ context.Context, template *models.TransactionStatus) error {
p.mu.Lock()
defer p.mu.Unlock()
chunk := make([]string, len(template.TxIDs))
copy(chunk, template.TxIDs)
p.chunks = append(p.chunks, chunk)
return nil
}

func (p *chunkCapturingPublisher) Subscribe(context.Context, string) (<-chan *models.TransactionStatus, error) {
return nil, errors.New("chunkCapturingPublisher: Subscribe not used in tests")
}
func (p *chunkCapturingPublisher) Close() error { return nil }

// A MINED fan-out larger than maxTxIDsPerBulkEvent is split across multiple bulk
// events so no single event exceeds the Kafka producer's max message size. A
// ~27k-tx block previously serialized to ~1.85 MB — over the 1 MiB default
// Producer.MaxMessageBytes — so PublishBulk failed and the MINED event was
// silently dropped for large blocks (the DB status was still MINED).
func TestBuilder_MarkMinedAndPublish_ChunksLargeTxidList(t *testing.T) {
ms := newMockStore()
pub := &chunkCapturingPublisher{}
b := newTestBuilder(ms, "http://unused.invalid")
b.publisher = pub

const n = maxTxIDsPerBulkEvent*2 + 37
txids := make([]string, n)
for i := range txids {
txids[i] = fmt.Sprintf("tx%064d", i)
}

b.markMinedAndPublish(context.Background(), zap.NewNop(), "blkD", 4242, txids)

wantChunks := (n + maxTxIDsPerBulkEvent - 1) / maxTxIDsPerBulkEvent
if len(pub.chunks) != wantChunks {
t.Fatalf("expected %d bulk events, got %d", wantChunks, len(pub.chunks))
}
total := 0
for i, c := range pub.chunks {
if len(c) == 0 || len(c) > maxTxIDsPerBulkEvent {
t.Fatalf("chunk %d has invalid size %d (cap %d)", i, len(c), maxTxIDsPerBulkEvent)
}
total += len(c)
}
if total != n {
t.Fatalf("chunks must cover every txid exactly once: got %d, want %d", total, n)
}
}
Loading