Skip to content

feat:support rocketmq mode in tcc#1125

Open
XiaoFeiASK wants to merge 6 commits into
apache:masterfrom
XiaoFeiASK:feat--support-rocketmq-mode-in-tcc
Open

feat:support rocketmq mode in tcc#1125
XiaoFeiASK wants to merge 6 commits into
apache:masterfrom
XiaoFeiASK:feat--support-rocketmq-mode-in-tcc

Conversation

@XiaoFeiASK

@XiaoFeiASK XiaoFeiASK commented Jun 13, 2026

Copy link
Copy Markdown
  • I have registered the PR changes.

What this PR does:

This PR integrates the END_TRANSACTION protocol adaptation into the TCC RocketMQ mode, enabling immediate Commit/Rollback notifications to the Broker and eliminating the reliance on the 60-second checkback mechanism.

  1. EndTransactionSender — Core protocol implementation
  • Manually constructs the RocketMQ Remoting protocol END_TRANSACTION request (RequestCode=37) and sends Commit/Rollback instructions directly to the Broker via TCP.
  • Includes NameServer routing query (RequestCode=105) to automatically resolve the Broker master node address.
  1. TccRocketMQAction — Changes TCC phase-two behavior from "no-op" to "active notification + checkback fallback"
  • Commit(): Constructs the END_TRANSACTION request (commitOrRollback=8) and notifies the Broker to commit the half message via sendEndTransaction(). On failure, it logs a warning, returns true, and falls back to the Broker checkback.
  • Rollback(): Constructs the END_TRANSACTION request (commitOrRollback=12) and notifies the Broker to roll back the half message via sendEndTransaction(). On failure, it similarly falls back to the checkback.
  • buildEndTransactionHeader(): Extracts metadata such as offsetMsgId, brokerName, topic, and queueOffset from ActionContext, and parses CommitLogOffset.
  1. TransactionListener — Corrects global status mapping
  • Changes GlobalStatusCommitFailed to return UnknownState, preventing the Broker from directly rolling back during checkback and allowing the TC to continue commit retries.
  1. TCCServiceProxy — Adds delayed reporting for ActionContext
  • After a successful Prepare phase, it writes message metadata (msgId, brokerName, topic, etc.) back to the TC via BranchReport, ensuring the phase-two Commit/Rollback can access the Broker address information.

Which issue(s) this PR fixes?
Related to #765

Special notes for your reviewer:

Does this PR introduce a user-facing change?:

Add RocketMQ TCC integration support for distributed transaction messaging

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds active RocketMQ transactional message finalization for Seata TCC by sending END_TRANSACTION directly to the broker (commit/rollback) and by ensuring the TCC phase-two has enough message metadata via delayed ActionContext reporting to the TC. It also adjusts RocketMQ check-back status mapping to avoid premature rollback when the global status is CommitFailed.

Changes:

  • Add an END_TRANSACTION remoting implementation (NameServer route query + TCP sender) and switch TCC RocketMQ phase-two from no-op to best-effort broker notification with check-back fallback.
  • Delay-report updated ActionContext after a successful TCC Prepare so phase-two can access RocketMQ message/broker metadata.
  • Fix GlobalStatusCommitFailed mapping to UnknownState and update related tests; add RocketMQ producer default SendMsgTimeout.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
pkg/rm/tcc/tcc_service.go Report updated ActionContext after Prepare (delayed report).
pkg/integration/rocketmq/transaction_listener.go Adjust global-status → RocketMQ transaction-state mapping.
pkg/integration/rocketmq/transaction_listener_test.go Update mapping expectations.
pkg/integration/rocketmq/tcc_rocketmq_action.go Actively send END_TRANSACTION on Commit/Rollback; build request header from ActionContext.
pkg/integration/rocketmq/tcc_rocketmq_action_test.go Add tests for active Commit/Rollback and header building.
pkg/integration/rocketmq/seata_producer.go Add default SendMsgTimeout configuration.
pkg/integration/rocketmq/end_transaction_sender.go New TCP remoting implementation for NameServer route lookup + broker END_TRANSACTION.
pkg/integration/rocketmq/end_transaction_sender_test.go Add unit tests for protocol encoding and helper logic.
pkg/integration/rocketmq/constants.go Add topic ActionContext key.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pkg/rm/tcc/tcc_service.go
Comment thread pkg/integration/rocketmq/tcc_rocketmq_action.go
Comment thread pkg/integration/rocketmq/tcc_rocketmq_action.go
Comment thread pkg/integration/rocketmq/end_transaction_sender.go Outdated
Comment thread pkg/integration/rocketmq/end_transaction_sender.go
Comment thread pkg/integration/rocketmq/end_transaction_sender.go
Comment thread pkg/integration/rocketmq/end_transaction_sender.go
Comment thread pkg/integration/rocketmq/end_transaction_sender_test.go
@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 42.28571% with 202 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.10%. Comparing base (84c991f) to head (55bf8b3).

Files with missing lines Patch % Lines
pkg/integration/rocketmq/end_transaction_sender.go 37.30% 136 Missing and 22 partials ⚠️
pkg/rm/tcc/tcc_service.go 0.00% 30 Missing ⚠️
pkg/integration/rocketmq/tcc_rocketmq_action.go 81.25% 9 Missing and 3 partials ⚠️
pkg/integration/rocketmq/seata_producer.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1125      +/-   ##
==========================================
- Coverage   58.36%   58.10%   -0.27%     
==========================================
  Files         282      283       +1     
  Lines       19405    19750     +345     
==========================================
+ Hits        11326    11475     +149     
- Misses       7102     7272     +170     
- Partials      977     1003      +26     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@thunguo thunguo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以确认下这些是不是需要调整

Comment thread pkg/integration/rocketmq/end_transaction_sender.go Outdated
Comment thread pkg/integration/rocketmq/end_transaction_sender.go Outdated
Comment thread pkg/integration/rocketmq/end_transaction_sender.go Outdated
Comment thread pkg/integration/rocketmq/end_transaction_sender.go Outdated
Comment thread pkg/integration/rocketmq/end_transaction_sender.go Outdated
@XiaoFeiASK

Copy link
Copy Markdown
Author

新增修改

pkg/integration/rocketmq/end_transaction_sender.go

  1. 问题: 每次 Send 都新建 TCP 连接,并发场景下产生大量短连接
    修复: 新增 connPool + defaultTCPSender,按 broker 地址维度复用连接(默认 4 连接),支持池连接失效后自动重试一次新连接,并带惰性空闲清理(60s 间隔 / 10min 超时)
    影响: 避免高并发下的连接风暴,降低延迟和资源消耗

  2. 问题: queryBrokerAddrFromNameServer 中 DialTimeout 写死 3s,与外层 timeout 不一致
    修复: 将 net.DialTimeout 的第三个参数改为从 sendEndTransaction 透传下来的 timeout 变量
    影响: 超时行为统一,便于通过配置全局控制

  3. 问题: 自定义 readFull 封装,功能与标准库 io.ReadFull 完全重复
    修复: 删除 readFull 函数,所有调用处直接使用 io.ReadFull
    影响: 减少无意义封装,代码更简洁

  4. 问题: decodeResponseHeader 中硬编码 13,无注释说明含义
    修复: 定义常量 rmqResponseHeaderFixedPartLength = 13,并注释说明各字段字节构成(code 2 + language 1 + version 2 + opaque 4 + flag 4)
    影响: 提升可维护性,后续开发者一目了然

  5. 问题: encodeExtFields 对 key 做 sort.Strings 排序,Broker 解析并不依赖顺序
    修复: 移除排序逻辑,直接使用 for range 遍历 map
    影响: 消除无意义排序开销,编码逻辑与 RocketMQ 协议语义一致

@github-actions

Copy link
Copy Markdown

Bot detected the issue body's language is not English, translate it automatically.

Add Amendment

pkg/integration/rocketmq/end_transaction_sender.go

  1. Problem: Each Send creates a new TCP connection, generating a large number of short connections in concurrent scenarios
    Fix: Added connPool + defaultTCPSender, multiplex connection by broker address dimension (default 4 connection), support automatically retry a new connection after the pool connection fails, with lazy idle cleaning (60s interval/10min timeout)
    Impact: Avoid connection storms with high concurrency, reducing latency and resource consumption

  2. Issue: DialTimeout in queryBrokerAddrFromNameServer is dead for 3s, inconsistent with the outer timeout
    Fix: Change the third parameter of net.DialTimeout to the timeout variable passed through from sendEndTransaction
    Impact: Timeout behavior is uniform, facilitating global control by configuring

  3. Problem: Custom readFull footprint, fully duplicated with standard library io.ReadFullFix: Remove readFull function, use io.ReadFull directly for all calls
    Impact: Less frivolous encapsulation, more concise code

  4. Problem: Hardcoded 13 in decodeResponseHeader, No Comment Description Meaning
    Fix: Define constant rmqResponseHeaderFixedPartLength = 13 and comment out each field byte composition (code 2 + language 1 + version 2 + opaque 4 + flag 4)
    Impact: Improved maintainability, at-a-glance for subsequent developers

  5. Issue: encodeExtFields sort key: [redacted] sort.Strings, Broker parsing does not depend on order
    Fix: Remove sort logic, traverse map directly with for range
    Impact: Eliminates meaningless sort overhead, encoding logic consistent with RocketMQ protocol semantics

actionCtx = make(map[string]interface{})
}

offsetMsgID := getStringFromMap(actionCtx, ActionContextKeyOffsetMsgId)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offsetMsgId 解析失败时,感觉不应该继续发 0 offset
这里 offsetMsgId 解析失败时会把 commitLogOffset 默认为 0,然后继续发 END_TRANSACTION。这个感觉有点怪,因为broker 可能会按错误 offset 处理半消息。官方 client 的 endTransaction 是 offsetMsgId 为空时 fallback 到 msgId。这里要么也做 fallback,要么解析失败时直接跳过主动通知,走 check-back

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay,辛苦了麻烦有空再看下~

@XiaoFeiASK

Copy link
Copy Markdown
Author

新增修改

pkg/integration/rocketmq/tcc_rocketmq_action.go

问题: offsetMsgId 解析失败时 commitLogOffset 默认为 0,继续发送 END_TRANSACTION 可能导致 broker 按错误 offset 处理半消息
修复: 当 offsetMsgId 为空或解析失败时,fallback 到 msgId 解析 commitLogOffset(与官方 client 行为对齐),并更新对应单测验证该 fallback 行为
影响: 避免发送无效 0 offset,提升事务消息可靠性,与 RocketMQ 官方客户端语义一致

@github-actions

Copy link
Copy Markdown

Bot detected the issue body's language is not English, translate it automatically.

Add Amendment

pkg/integration/rocketmq/tcc_rocketmq_action.go

Problem: when offsetMsgId parsing fails, commitLogOffset defaults to 0, continuing to send end_transaction may cause broker to process half messages by error offset
Fix: When offsetMsgId is empty or parsing fails, fallback to msgId parsing commitLogOffset (aligned with official client behavior), and update the corresponding single test to verify the fallback behavior
Impact: Avoid sending invalid 0 offset, improve transaction message reliability, consistent with RocketMQ official client semantics

}

// fallback: if offsetMsgId is empty or failed to parse, try msgId (aligned with official client behaviour)
if commitLogOffset == 0 {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里比之前好一些,但如果 offsetMsgId 和 msgId 都缺失或解析失败,commitLogOffset 还是会保持 0 并继续发送 END_TRANSACTION。这个场景下可能应该直接跳过主动通知,走 check-back?否则到最后还是可能把无效 offset 发给 broker

Comment thread pkg/rm/tcc/tcc_service.go
log.Errorf("[TCC] marshal updated ActionContext failed, xid=%s, branchId=%d, err=%v", bac.Xid, bac.BranchId, err)
return fmt.Errorf("marshal updated ActionContext failed: %w", err)
}
err = rm.GetRMRemotingInstance().BranchReport(rm.BranchReportParam{

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里依赖 BranchReport 把更新后的 ActionContext 写回 TC。能确认 TC 会更新已注册分支的 applicationData 吗?如果这里只是状态上报,那第二阶段拿到的还是旧 applicationData,主动 END_TRANSACTION 就会退回 check-back。可以补个测试或者大概说明一下这个依赖

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants