Skip to content

[0.49-topru] topru manager keyspace cluster#82

Merged
zeminzhou merged 29 commits into0.49from
codex/topru-manager-keyspace-cluster
Apr 9, 2026
Merged

[0.49-topru] topru manager keyspace cluster#82
zeminzhou merged 29 commits into0.49from
codex/topru-manager-keyspace-cluster

Conversation

@zeminzhou
Copy link
Copy Markdown
Collaborator

@zeminzhou zeminzhou commented Mar 22, 2026

PR Review Summary

22 files changed, +2510 / -971

主要改动

  1. Manager-based TiDB 拓扑发现 (tidb_manager.rs, fetch/mod.rs)

    • 新增 TiDBManagerTopologyFetcher,通过 manager 的 /api/tidb/get_active_tidb 接口获取活跃 TiDB 实例,替代原来只能从 etcd 读取的方式
    • 支持 manager_server_address + tidb_namespace 配置,tidb_namespace 在配了 manager 时为必填(fail-fast ConfigurationError
    • Response body 限制 8 MiB,防止内存打爆
  2. Keyspace 分片路由 (tidb_manager.rs)

    • 通过 VECTOR_STS_REPLICA_COUNT / VECTOR_STS_ID 环境变量实现 keyspace 级别的 StatefulSet 分片
    • 使用 FNV-1a hash(跨语言/跨进程稳定),按 keyspace_name 取模分配 TiDB 实例
    • 缺少 keyspace_name 的 TiDB 实例在分片模式下被跳过并打 warn 日志
  3. PD Keyspace → org/cluster 路由 (keyspace_cluster.rs)

    • 新增 PdKeyspaceResolver,通过 PD /pd/api/v2/keyspaces/{name} 接口查询 keyspace 的 serverless_tenant_id / serverless_cluster_id
    • 使用有上限的 LRU cache(默认 10000),避免无限增长
    • 不缓存失败/未命中结果,允许后续轮询恢复
    • 支持 TLS 配置
  4. Sink 层 keyspace 路由写入 (topsql_data_deltalake/processor.rs, topsql_meta_deltalake/processor.rs)

    • 新增 enable_keyspace_cluster_mapping 配置项,开启后 base_path 中的 org=xxx/cluster=xxx 模板段会被替换为实际路由值
    • Writer 从 HashMap<String, DeltaLakeWriter> 改为 HashMap<WriterKey, DeltaLakeWriter>WriterKey 包含 table_name + table_path,支持同一 table_name 写入不同 org/cluster 路径
    • 移除了 unsafe 块(原来通过 raw pointer 从 Arc 取值),改用 #[derive(Clone)] + sink.clone()
    • 新增 retry 机制:keyspace 路由失败时最多重试 5 次,exponential backoff(5s→60s cap),超限丢弃并打日志
  5. Meta sink dedup 修复 (topsql_meta_deltalake/processor.rs)

    • 修复了一个数据丢失 bug:原来 dedup key 在 process_events 阶段就写入 LRU,如果后续 flush_buffer 失败,retry 时这些 key 已在 LRU 中被跳过,导致永久丢失
    • 改为 staged dedup:pending_dedup_keys 暂存,仅在 flush_buffer 成功后才 commit_dedup_keys 写入 LRU
    • Dedup key 加入 keyspace 前缀({keyspace}_{digest}_{date}),避免不同 keyspace 的 meta 互相去重
  6. TiKV TopSQL 采集开关 (topsql_v2/controller.rs, topsql_v2/mod.rs)

    • 新增 enable_tikv_topsql 配置(默认 true),关闭后只采集 TiDB TopSQL/TopRU 数据
  7. Keyspace 传播到 meta 事件 (upstream/tidb/parser.rs)

    • topsql_sql_metatopsql_plan_meta 事件现在携带 keyspace 字段(从 proto 的 keyspace_name 解析)
    • decode_keyspace_name 抽成公共方法,统一处理空值和 UTF-8 解码
  8. Schema evolution 代码清理 (deltalake_writer/mod.rs, deltalake_writer/schema.rs, tests/system_tables_integration_test.rs)

    • 移除了 check_schema_evolution / reset_schema_cache / EvolutionResult 等未使用的 schema evolution 检测逻辑
    • 删除了对应的集成测试(test_sqlstatement_schema_evolution_add_column 等)

对现有功能的影响

  • 向后兼容:所有新配置项都有默认值(enable_keyspace_cluster_mapping=false, enable_tikv_topsql=true, manager_server_address=None),不配置时行为与之前完全一致
  • TopologyFetcher::new 签名变更:新增了 manager_server_addresstidb_namespace 参数,所有调用方(keyviz, system_tables, topsql, topsql_v2)已适配,传 None
  • Writer key 类型变更:从 String 改为 WriterKey struct,影响 writer 缓存的 key 结构,但逻辑等价
  • Schema evolution 移除deltalake_writer 中的 schema evolution 检测逻辑被删除,如果有其他 sink 依赖这个功能需要注意(从 diff 看目前没有其他调用方)

内存泄漏分析

无内存泄漏风险,具体分析:

  1. LRU cache 有上限PdKeyspaceResolver 的 keyspace 路由缓存使用 LruCache,默认容量 10000,超出自动淘汰
  2. Manager shard config 一次性读取:环境变量在 fetcher 初始化时读一次,缓存到 struct,不会重复分配
  3. Writer map 按需增长但有界HashMap<WriterKey, DeltaLakeWriter> 按 (table_name, table_path) 组合增长,实际数量受 keyspace × component 类型限制,不会无限增长
  4. unsafe 移除:原来的 Arc::into_raw + 手动重建的 unsafe 模式已被 #[derive(Clone)] 替代,消除了潜在的引用计数泄漏风险
  5. pending_dedup_keys 及时清理:flush 成功后 commit 到 LRU 并清空,失败时在 retry 前也会 clear
  6. Response body 限制:manager response 限制 8 MiB,防止异常响应导致内存暴涨
  7. Meta dedup LRUseen_keys_sql_meta / seen_keys_plan_meta 使用配置化容量的 LRU,有上限

其他观察

  • process_events_loop 的 retry 会 clone 整个 events_vecretry_snapshot),在大批量事件时可能有短暂的内存峰值,但最多 5 次 retry 后释放,不构成泄漏
  • join_pathbuild_table_path 在 data sink 和 meta sink 中有重复实现,可以考虑抽到 common 模块

@pingcap-cla-assistant
Copy link
Copy Markdown

pingcap-cla-assistant Bot commented Mar 22, 2026

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ zhoucai-pingcap
❌ zeminzhou
You have signed the CLA already but the status is still pending? Let us recheck it.

@ti-chi-bot ti-chi-bot Bot added the size/XXL label Mar 22, 2026
Comment thread src/common/keyspace_cluster.rs Outdated
Comment thread src/common/topology/fetch/tidb_manager.rs Outdated
Comment thread src/common/keyspace_cluster.rs
Comment thread src/common/topology/fetch/tidb_manager.rs Outdated
Comment thread src/common/topology/fetch/mod.rs
@zeminzhou
Copy link
Copy Markdown
Collaborator Author

跟一下这轮 review comments,刚刚已经在 167a2de 里处理并 push 到这个 PR:

  1. keyspace_cluster 的 cache 无上限问题
    已改成有上限的 LRU cache,默认容量 10000,避免 serverless 活跃 cluster 持续变化时无限增长占内存。

  2. manager 选出来的 active TiDB 为空时没有日志
    已补日志。现在在 manager 返回结果经过筛选后为空时,会输出一条带 manager_server_address / tidb_namespace / shard_config 的信息,方便排查。

  3. resolve_keyspace 失败/未命中时没有写缓存,是否会第一次失败第二次成功
    这里保持“不缓存失败/未命中”是有意为之。这样 PD 元数据延迟出现、临时网络抖动、或者第一次查不到第二次查到时,后续轮询还能恢复;如果把失败结果也缓存住,反而会放大瞬时异常。我在代码里补了注释说明这一点。

  4. manager response body 可能过大,担心内存打爆
    已加 body 大小限制,当前限制为 8 MiB。超过上限会直接报错,不再无限制读完整个 body。

  5. 只配置 manager_server_address,没配置 tidb_namespace 时是什么行为
    我把这块改成 fail-fast 了:现在如果配了 manager_server_addresstidb_namespace 为空/全空白,会直接返回 ConfigurationError,避免之前静默拿不到 TiDB 拓扑的行为。

本地已跑过定向测试:

  • cargo test keyspace_cluster --lib
  • cargo test tidb_manager --lib
  • cargo test normalize_tidb_namespace --lib

如果你更倾向于第 5 点走“fallback 回 etcd”而不是 fail-fast,我也可以再改。

Base automatically changed from 0.49-topru to 0.49 March 24, 2026 02:45
Copy link
Copy Markdown
Collaborator Author

@zeminzhou zeminzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude:整体代码质量不错,测试覆盖充分,review comment 的处理也很到位(LRU cache、body 限制、fail-fast 配置校验等)。

另外有一个关于 TopSQLDeltaLakeSink::newunsafe 块的建议:通过 raw pointer 从 Arc 中取出内部值比较脆弱,如果以后有人改了代码多加了一个 clone 就可能导致 UB。建议重构为:先创建 channel 和 shared state(Arc<Mutex<HashMap<...>>>),spawn task 时直接传这些 Arc,然后正常构造 Self 返回,完全不需要 unsafe。

Comment thread src/sinks/topsql_data_deltalake/processor.rs
Comment thread src/common/keyspace_cluster.rs Outdated
Comment thread src/common/topology/fetch/tidb_manager.rs Outdated
Comment thread src/common/topology/fetch/tidb_manager.rs Outdated
Comment thread src/common/topology/fetch/tidb_manager.rs
Comment thread src/sources/topsql_v2/mod.rs
@zeminzhou
Copy link
Copy Markdown
Collaborator Author

跟进 review 3997482947,这轮 comment 我确认都需要通过代码修复处理,没有仅靠文字回复即可关闭的项;相关修改已在 54c1fda 中 push 到当前 PR 分支。

处理内容:

  • read_manager_shard_config_from_env 不再在每次 get_up_tidbs 时读取,改为在 LegacyTopologyFetcher 初始化时读取一次并缓存。
  • hash_keyspace_name 补了注释,说明为什么选 FNV-1a,以及它作为跨服务 keyspace sharding 隐式契约的要求。
  • topsql_v2manager_server_address / tidb_namespacePLACEHOLDER doc comment 已补成实际说明。
  • TopSQLDeltaLakeSink::new 里的 unsafe 已移除:现在先正常构造 sink,再 clone 一份给后台 task,不再通过 raw pointer 从 Arc 里取内部值。

本地定向验证:

  • cargo test -q --package vector-extensions --lib manager_shard_config -- --nocapture
  • cargo test -q --package vector-extensions --lib test_send_when_batch_size_reached -- --nocapture
  • cargo test -q --package vector-extensions --lib normalize_tidb_namespace -- --nocapture
  • cargo test -q --package vector-extensions --lib hash_keyspace_name_is_stable -- --nocapture

@zeminzhou
Copy link
Copy Markdown
Collaborator Author

zeminzhou commented Mar 24, 2026

codex: 关于 review body 里提到的 TopSQLDeltaLakeSink::newunsafe 建议,这部分已经在 54c1fda 里改掉了:现在先正常构造 Self,再 clone 一份给后台 task 使用,不再通过 raw pointer 从 Arc 里取内部值,因此后续即使有人再改初始化路径也不会踩到 UB 风险。

Copy link
Copy Markdown
Collaborator Author

@zeminzhou zeminzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude:LGTM 🎉

上一轮的 review comments 都已修复:

  1. process_events_loop 加了 max retry (5次) + exponential backoff (5s→60s cap) + 超限丢弃日志
  2. unsafe 块移除,改用 #[derive(Clone)] + sink.clone()(data 和 meta processor 都改了)
  3. is_not_found_body 收紧为 "keyspace not found",支持纯文本和 JSON 格式,加了测试
  4. normalize_namespaces 抽成公共函数 normalize_namespace_list
  5. ✅ shard config 初始化时读一次环境变量,缓存到 struct
  6. ✅ doc comment PLACEHOLDER 已补上
  7. ✅ FNV-1a hash 加了注释说明选择原因和跨服务契约

- Replace recursive JSON extraction with direct serde deserialization
  for ActiveTiDBAddress, removing ~100 lines of guessing logic
- Remove redundant normalize_namespaces wrapper method
- Change PdKeyspaceResolver::new to take Option<&TlsConfig> instead
  of owned Option<TlsConfig> to avoid unnecessary clones
- Add debug_assert on replace_keyspace_route_segments to catch
  org_id/cluster_id containing path separators
- Add TODO for resolve_keyspace concurrent cache miss dedup
- Add comment explaining org=/cluster= path segment convention
Empty array from manager is a legitimate response (no active TiDB
instances). Previously it was treated as an error while filter-to-empty
after keyspace sharding was treated as Ok — inconsistent behavior.

Now both cases flow through get_up_tidbs which logs and returns Ok,
making the behavior uniform. Also removes the now-unused
InvalidManagerResponse error variant.
… on retry

process_events wrote dedup keys to LRU before flush_buffer persisted
data to Delta Lake. If flush failed (e.g. PD keyspace lookup error),
the retry path would find all keys already in LRU and silently skip
them, causing permanent meta data loss.

Fix: stage dedup keys in a pending_dedup_keys buffer during
process_events (read-only LRU check). Commit to LRU only after
flush_buffer succeeds. On failure, process_events_loop clears
pending state so the cloned retry snapshot is re-processed cleanly.
# Conflicts:
#	src/sinks/topsql_data_deltalake/mod.rs
#	src/sinks/topsql_data_deltalake/processor.rs
#	src/sinks/topsql_meta_deltalake/mod.rs
#	src/sinks/topsql_meta_deltalake/processor.rs
…eyspace-cluster

# Conflicts:
#	src/sources/topsql_v2/upstream/tidb/parser.rs
…ster module

Deduplicate identical retry delay function and constants from both
topsql_data_deltalake and topsql_meta_deltalake processors into
common/keyspace_cluster.rs. Move the associated test as well.
Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
zhoucai-pingcap
zhoucai-pingcap previously approved these changes Apr 8, 2026
…king

Replace the single cache-miss-then-fetch pattern with double-check
locking using per-keyspace Mutex guards. Concurrent resolve_keyspace
calls for the same keyspace now serialize behind a shared lock so only
the first caller issues the HTTP request; subsequent callers find the
result in cache after acquiring the lock.

Also extracts the HTTP fetch logic into fetch_keyspace_from_pd for
clarity and removes the TODO comment.
…rowth

Wrap the post-lock logic in an async block so the per-keyspace lock
cleanup runs on every exit path: double-check cache hit, fetch success,
and fetch error. Previously cleanup only ran on the straight-line
success path, leaving map entries permanently for early-return callers.
@zeminzhou zeminzhou merged commit 01f0354 into 0.49 Apr 9, 2026
1 of 2 checks passed
@zeminzhou zeminzhou deleted the codex/topru-manager-keyspace-cluster branch April 9, 2026 08:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants