Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions routes/fiber/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,17 @@ func (r *Routes) handlePostTx(c *fiber.Ctx) error {
return r.handleSubmitError(c, err)
}

status.StatusCode = http.StatusOK
status.Title = "OK"
return c.JSON(status)
// Preserve the StatusCode set by the service (which reflects the actual
// broadcast outcome — 2xx for success, 4xx for rejection, 5xx for service
// failure). Mirror it to the HTTP response so downstream consumers can
// distinguish success from failure without parsing txStatus.
if status.StatusCode == 0 {
status.StatusCode = http.StatusOK
}
if status.Title == "" {
status.Title = defaultTitleForStatus(status.Status)
}
return c.Status(status.StatusCode).JSON(status)
}

// handlePostTxs submits multiple transactions
Expand Down Expand Up @@ -175,13 +183,42 @@ func (r *Routes) handlePostTxs(c *fiber.Ctx) error {
return r.handleSubmitError(c, err)
}

// Per-tx StatusCode is preserved from the service so each entry in the
// array reflects its own outcome. The HTTP envelope itself stays 200 —
// the batch request succeeded, per-tx results are inside.
for _, s := range statuses {
s.StatusCode = http.StatusOK
s.Title = "OK"
if s.StatusCode == 0 {
s.StatusCode = http.StatusOK
}
if s.Title == "" {
s.Title = defaultTitleForStatus(s.Status)
}
}
return c.JSON(statuses)
}

// defaultTitleForStatus returns a human-readable title for a tx outcome when
// the service did not set one. Used as a fallback so clients see something
// meaningful in the response Title field.
func defaultTitleForStatus(s models.Status) string {
switch s {
case models.StatusMined, models.StatusImmutable, models.StatusSeenOnNetwork,
models.StatusAcceptedByNetwork, models.StatusSentToNetwork,
models.StatusReceived:
return "OK"
case models.StatusRejected:
return "Transaction rejected"
case models.StatusDoubleSpendAttempted:
return "Double spend attempted"
case models.StatusServiceError:
return "Service error"
case models.StatusUnknown:
return "Unknown"
default:
return string(s)
}
}

// handleSubmitError returns an appropriate HTTP response for transaction submission errors.
// It checks for ARC-compatible errors and returns the corresponding status code.
func (r *Routes) handleSubmitError(c *fiber.Ctx, err error) error {
Expand Down
140 changes: 63 additions & 77 deletions service/embedded/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo
slog.Duration("timeout", 15*time.Second),
)
resultCh := make(chan *models.TransactionStatus, len(endpoints))
submitCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
broadcastCtx, broadcastCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)

broadcastStart := time.Now()
var wg sync.WaitGroup
Expand All @@ -221,7 +220,7 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo
go func(ep string) {
defer wg.Done()
epStart := time.Now()
status := e.submitToTeranodeSync(submitCtx, ep, tx.Bytes(), txid)
status := e.submitToTeranodeSync(broadcastCtx, ep, tx.Bytes(), txid)
if status != nil {
e.logger.Debug("endpoint responded",
slog.String("txid", txid),
Expand All @@ -230,80 +229,62 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo
slog.Duration("elapsed", time.Since(epStart)),
)
}
select {
case resultCh <- status:
case <-submitCtx.Done():
return
}
resultCh <- status
}(endpoint)
}

// Close channel when all goroutines complete
// Clean up when all goroutines complete
go func() {
wg.Wait()
broadcastCancel()
close(resultCh)
}()

// Collect results:
// - Success (ACCEPTED/SENT): return immediately
// - Success (ACCEPTED/SENT): return immediately, goroutines continue propagating
// - Rejected (4xx): return immediately, tx is invalid
// - Service error (5xx): wait for other endpoints
// - All service errors / timeout: return last error
// - All failed: return last error
var lastError *models.TransactionStatus
for {
select {
case status, ok := <-resultCh:
if !ok {
// All endpoints responded, none succeeded
if lastError == nil {
lastError, _ = e.store.GetStatus(ctx, txid)
}
e.logger.Debug("broadcast failed on all endpoints",
slog.String("txid", txid),
slog.String("status", string(lastError.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, lastError)
return lastError, nil
}
if status == nil {
continue
}
switch status.Status {
case models.StatusAcceptedByNetwork, models.StatusSentToNetwork:
e.logger.Debug("broadcast complete",
slog.String("txid", txid),
slog.String("status", string(status.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusRejected:
e.logger.Debug("transaction rejected by network",
slog.String("txid", txid),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusServiceError:
lastError = status
case models.StatusUnknown, models.StatusReceived, models.StatusSeenOnNetwork,
models.StatusDoubleSpendAttempted, models.StatusMined, models.StatusImmutable:
lastError = status
}
case <-submitCtx.Done():
if lastError == nil {
lastError, _ = e.store.GetStatus(ctx, txid)
}
e.logger.Warn("broadcast timeout",
for status := range resultCh {
if status == nil {
continue
}
switch status.Status {
case models.StatusAcceptedByNetwork, models.StatusSentToNetwork:
e.logger.Debug("broadcast complete",
slog.String("txid", txid),
slog.Int("endpoints", len(endpoints)),
slog.String("status", string(status.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, lastError)
return lastError, nil
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusRejected:
e.logger.Debug("transaction rejected by network",
slog.String("txid", txid),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusServiceError:
lastError = status
case models.StatusUnknown, models.StatusReceived, models.StatusSeenOnNetwork,
models.StatusDoubleSpendAttempted, models.StatusMined, models.StatusImmutable:
lastError = status
}
}

// All endpoints responded, none succeeded
if lastError == nil {
lastError, _ = e.store.GetStatus(ctx, txid)
}
e.logger.Debug("broadcast failed on all endpoints",
slog.String("txid", txid),
slog.String("status", string(lastError.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, lastError)
return lastError, nil
}

// SubmitTransactions submits multiple transactions for broadcast.
Expand Down Expand Up @@ -410,10 +391,6 @@ func (e *Embedded) SubmitTransactions(ctx context.Context, rawTxs [][]byte, opts
txInfos = append(txInfos, txInfo{tx: tx, rawTx: rawTx, txid: txid, isNew: isNew, status: existingStatus})
}

// Submit all to teranode synchronously with timeout
submitCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

var responses []*models.TransactionStatus
for _, info := range txInfos {
// Skip rebroadcast if already confirmed on network or rejected
Expand All @@ -433,22 +410,20 @@ func (e *Embedded) SubmitTransactions(ctx context.Context, rawTxs [][]byte, opts

endpoints := e.teranodeClient.GetEndpoints()
resultCh := make(chan *models.TransactionStatus, len(endpoints))
broadcastCtx, broadcastCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)
var wg sync.WaitGroup
for _, endpoint := range endpoints {
wg.Add(1)
go func(ep string) {
defer wg.Done()
status := e.submitToTeranodeSync(submitCtx, ep, rawTx, info.txid)
select {
case resultCh <- status:
case <-submitCtx.Done():
return
}
status := e.submitToTeranodeSync(broadcastCtx, ep, rawTx, info.txid)
resultCh <- status
}(endpoint)
}

go func() {
wg.Wait()
broadcastCancel()
close(resultCh)
}()

Expand Down Expand Up @@ -557,6 +532,15 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra
if statusCode >= 400 && statusCode < 500 {
status = models.StatusRejected
}
// Map missing HTTP status to appropriate code
switch {
case statusCode != 0:
// pass through actual HTTP status
case ctx.Err() != nil:
statusCode = http.StatusGatewayTimeout
default:
statusCode = http.StatusServiceUnavailable
}
e.logger.Debug("endpoint broadcast failed",
slog.String("txid", txid),
slog.String("endpoint", endpoint),
Expand All @@ -565,10 +549,11 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra
slog.String("error", err.Error()),
)
return &models.TransactionStatus{
TxID: txid,
Status: status,
Timestamp: time.Now(),
ExtraInfo: err.Error(),
TxID: txid,
Status: status,
StatusCode: statusCode,
Timestamp: time.Now(),
ExtraInfo: err.Error(),
}
}

Expand All @@ -588,8 +573,9 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra
}

return &models.TransactionStatus{
TxID: txid,
Status: txStatus,
Timestamp: time.Now(),
TxID: txid,
Status: txStatus,
StatusCode: http.StatusOK,
Timestamp: time.Now(),
}
}