Skip to content

Retrieve dubbo server.address/server.port according to latest SemConv#17244

Open
steverao wants to merge 6 commits intoopen-telemetry:mainfrom
steverao:dubbo-client-lb
Open

Retrieve dubbo server.address/server.port according to latest SemConv#17244
steverao wants to merge 6 commits intoopen-telemetry:mainfrom
steverao:dubbo-client-lb

Conversation

@steverao
Copy link
Copy Markdown
Contributor

@steverao steverao commented Mar 31, 2026

@steverao
Copy link
Copy Markdown
Contributor Author

steverao commented Apr 7, 2026

Implementation description

Problem Statement

How can we accurately capture the registry address associated with each RPC invocation on the consumer side in Dubbo multi-registry scenarios?

Core Principle: Independent Directory and Invoker Chain per Service Reference

Key Insight

Important: In Dubbo's design, each service reference (ReferenceConfig) creates an independent Directory for each registry. Multiple registries are not stored in the same Directory; instead, each registry has its own Directory.

Complete Service Reference Flow in Dubbo

1. Initialization Phase: Service Reference Setup

// User code
@Reference
private UserService userService;

Dubbo's Internal Processing:

// 1. ReferenceConfig.refer() is invoked
// Location: org.apache.dubbo.config.ReferenceConfig#refer()

public synchronized T get() {
    if (ref == null) {
        init();
    }
    return ref;
}

private void init() {
    // ...
    ref = createProxy(map);  // Create proxy object
}

// 2. Generate registry:// URL based on registry configuration
// Example: registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?refer=...

// 3. Protocol.refer() routes to RegistryProtocol based on "registry://" protocol
// Location: org.apache.dubbo.registry.integration.RegistryProtocol#refer()

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // url = registry://127.0.0.1:8848/...
    
    // Normalize registry address
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))  // nacos
            .removeParameter(REGISTRY_KEY)
            .build();
    // url becomes nacos://127.0.0.1:8848/...
    
    // Get Registry instance (e.g., NacosRegistry)
    Registry registry = registryFactory.getRegistry(url);
    
    // ⭐️ Create RegistryDirectory (This is the key!)
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);  // ⭐️ Bind Directory to Registry
    directory.setProtocol(protocol);
    
    // Subscribe to provider information from registry
    directory.subscribe(...);
    
    // ⭐️ Call Cluster.join(directory) to create ClusterInvoker
    // This triggers our RegistryCapturingClusterWrapper
    Invoker<T> invoker = cluster.join(directory);
    
    return invoker;
}

2. Current solution interception Point: Cluster.join()

// RegistryCapturingClusterWrapper.java

@Override
public <T> Invoker<T> join(Directory<T> directory) {
    // 1. First call the original Cluster's join method (e.g., FailoverCluster)
    //    Returns the original ClusterInvoker (e.g., FailoverClusterInvoker)
    Invoker<T> originalInvoker = delegateJoin(cluster, directory, true);
    
    // 2. Wrap if needed
    return wrapIfNeeded(directory, originalInvoker);
}

private static <T> Invoker<T> wrapIfNeeded(Directory<T> directory, Invoker<T> invoker) {
    // Skip StaticDirectory (direct connection scenario, no registry information)
    if (isStaticDirectory(directory)) {
        return invoker;
    }
    
    // ⭐️ Extract registry address from Directory
    // directory is RegistryDirectory, which has a registry field
    String registryAddress = DubboRegistryUtil.tryExtractRegistryAddressFromDirectory(directory);
    
    if (registryAddress == null) {
        return invoker;
    }
    
    // ⭐️ Create RegistryCapturingInvoker wrapper
    // Store registryAddress in this Invoker instance
    return new RegistryCapturingInvoker<>(invoker, registryAddress);
}

3. Key Point: Binding Between Directory and Registry

// org.apache.dubbo.registry.integration.RegistryDirectory

public class RegistryDirectory<T> extends AbstractDirectory<T> {
    
    private Registry registry;  // ⭐️ Each RegistryDirectory holds one Registry instance
    
    public void setRegistry(Registry registry) {
        this.registry = registry;
    }
    
    public Registry getRegistry() {
        return registry;
    }
    
    // ...
}

Important: RegistryDirectory is bound to a specific Registry at creation time. One RegistryDirectory corresponds to one registry.

4. Registry Address Extraction Implementation

// DubboRegistryUtil.java - Reflection logic

private static String extractRegistryAddressFromDirectory(Directory<?> directory) {
    // 1. Get directory.getRegistry() via reflection
    MethodHandle getRegistryHandle = findAccessor(
        directory.getClass(), 
        "getRegistry"
    );
    
    Registry registry = (Registry) getRegistryHandle.invoke(directory);
    
    // 2. Get registry.getUrl() via reflection
    MethodHandle getUrlHandle = findAccessor(
        registry.getClass(), 
        "getUrl"
    );
    
    URL urlObj = (URL) getUrlHandle.invoke(registry);
    
    // 3. Extract protocol and address
    // urlObj = nacos://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?...
    String protocol = urlObj.getProtocol();  // "nacos"
    String address = urlObj.getAddress();    // "127.0.0.1:8848"
    
    return protocol + "://" + address;  // "nacos://127.0.0.1:8848"
}

Invocation Phase: Identify the Correct Registry

Invocation Chain Analysis

User code invocation
  userService.getUser(1)
    ↓
InvokerInvocationHandler.invoke()
  → Proxy handler
    ↓
MockClusterInvoker.invoke()  ← ⭐️ This is our wrapped RegistryCapturingInvoker
  → Our interception point
    ↓
  [RegistryCapturingInvoker.invoke()]
    ├─ DubboRegistryUtil.pushCapturedRegistryAddress("nacos://127.0.0.1:8848")
    ├─ try {
    │    return delegate.invoke(invocation);  ← Continue to original Invoker chain
    │      ↓
    │    FailoverClusterInvoker.invoke()
    │      ↓
    │    AbstractClusterInvoker.invoke()
    │      ↓
    │    ProtocolFilterWrapper$1.invoke()  ← Filter chain
    │      ↓
    │    OpenTelemetryClientFilter.invoke()  ← ⭐️ Read registryAddress here
    │      ↓
    │    DubboInvoker.invoke()
    │      ↓
    │    Network request to Provider
    │  }
    └─ finally {
         DubboRegistryUtil.clearCapturedRegistryAddress()
       }

RegistryCapturingInvoker Implementation (Key Component)

// RegistryCapturingInvoker.java

final class RegistryCapturingInvoker<T> implements Invoker<T> {

  private final Invoker<T> delegate;
  private final String registryAddress;  // ⭐️ Registry address stored here

  RegistryCapturingInvoker(Invoker<T> delegate, String registryAddress) {
    this.delegate = delegate;
    this.registryAddress = registryAddress;  // Set at construction time from Directory
  }

  @Override
  public Result invoke(Invocation invocation) {
    // ⭐️ Set registryAddress to ThreadLocal on each invocation
    DubboRegistryUtil.pushCapturedRegistryAddress(registryAddress);
    try {
      // Continue to original Invoker chain
      return delegate.invoke(invocation);
    } finally {
      // Clean up ThreadLocal after invocation
      DubboRegistryUtil.clearCapturedRegistryAddress();
    }
  }
}

ThreadLocal Usage

// DubboRegistryUtil.java

public final class DubboRegistryUtil {
  
  // ThreadLocal stores the registry address for the current thread's invocation
  private static final ThreadLocal<String> CAPTURED_REGISTRY_ADDRESS = new ThreadLocal<>();
  
  static void pushCapturedRegistryAddress(String address) {
    CAPTURED_REGISTRY_ADDRESS.set(address);
  }
  
  public static void clearCapturedRegistryAddress() {
    CAPTURED_REGISTRY_ADDRESS.remove();
  }
  
  @Nullable
  public static String extractRegistryAddress(RpcInvocation invocation) {
    // Filter calls this method to get registry address
    return CAPTURED_REGISTRY_ADDRESS.get();
  }
}

Multi-Registry Scenario Handling

Scenario 1: Single Service Reference with Multiple Registries

<!-- dubbo-consumer.xml -->
<dubbo:registry id="nacos1" address="nacos://127.0.0.1:8848"/>
<dubbo:registry id="nacos2" address="nacos://192.168.1.100:8848"/>

<dubbo:reference interface="com.demo.UserService" registry="nacos1,nacos2"/>

Dubbo's Internal Processing:

// ReferenceConfig.refer() calls RegistryProtocol.refer() for each registry

List<Invoker<?>> invokers = new ArrayList<>();

// First call: nacos1
Invoker invoker1 = registryProtocol.refer(
    UserService.class, 
    URL.valueOf("registry://127.0.0.1:8848/...")
);
// Internally creates:
//   - RegistryDirectory1 (bound to NacosRegistry1)
//   - Cluster.join(directory1) returns ClusterInvoker1
//   - Our wrapper: RegistryCapturingInvoker1("nacos://127.0.0.1:8848", ClusterInvoker1)
invokers.add(invoker1);

// Second call: nacos2
Invoker invoker2 = registryProtocol.refer(
    UserService.class, 
    URL.valueOf("registry://192.168.1.100:8848/...")
);
// Internally creates:
//   - RegistryDirectory2 (bound to NacosRegistry2)
//   - Cluster.join(directory2) returns ClusterInvoker2
//   - Our wrapper: RegistryCapturingInvoker2("nacos://192.168.1.100:8848", ClusterInvoker2)
invokers.add(invoker2);

// Aggregate multiple Invokers (via ZoneAwareCluster or RegistryAwareCluster)
Invoker finalInvoker = cluster.join(new StaticDirectory(invokers));

Key Points:

  1. Each registry has an independent Invoker chain
  2. Each Invoker chain determines its corresponding registry at creation time
  3. During invocation, which Invoker is routed to determines which RegistryCapturingInvoker is triggered

Scenario 2: Routing During Invocation

// User invocation
userService.getUser(1);

// Dubbo's internal routing logic
// 1. RegistryAwareClusterInvoker (aggregates multiple registry Invokers)
//    Selects one Invoker based on routing rules
//    Example: selects invoker1 (nacos1)

// 2. Call invoker1.invoke()
//    → RegistryCapturingInvoker1.invoke()
//      ├─ pushCapturedRegistryAddress("nacos://127.0.0.1:8848")  ← nacos1
//      ├─ delegate.invoke() → ClusterInvoker1 → Filter → Provider
//      └─ clearCapturedRegistryAddress()

// If next invocation selects invoker2 (nacos2)
userService.getUser(2);

// 3. Call invoker2.invoke()
//    → RegistryCapturingInvoker2.invoke()
//      ├─ pushCapturedRegistryAddress("nacos://192.168.1.100:8848")  ← nacos2
//      ├─ delegate.invoke() → ClusterInvoker2 → Filter → Provider
//      └─ clearCapturedRegistryAddress()

Why Does This Approach Work?

1. Directory is Bound to Registry at Creation Time

// Inside RegistryProtocol.refer()
RegistryDirectory<T> directory = new RegistryDirectory<>(type, url);
directory.setRegistry(registry);  // ⭐️ Binding
  • One RegistryDirectory corresponds to one Registry
  • directory.getRegistry() always returns the registry bound at creation

2. Cluster.join() Creates Independent Invoker for Each Directory

// Our interception
Invoker<T> invoker = cluster.join(directory);  // Called once per directory
  • In multi-registry scenarios, join() is called multiple times, each with a different directory
  • Each call creates a new RegistryCapturingInvoker storing the corresponding registryAddress

3. Invoker is Stateful

new RegistryCapturingInvoker<>(delegate, "nacos://127.0.0.1:8848");
  • RegistryCapturingInvoker is an object instance
  • registryAddress is an instance field, determined at object creation time
  • Different Invoker instances hold different registryAddress values

4. Invocation Associates Registry Through Invoker Instance

// User invocation
userService.getUser(1);
  ↓
// Dubbo routing selects an Invoker (e.g., invoker1)
invoker1.invoke(invocation)
  ↓
// invoker1 is a RegistryCapturingInvoker instance
// Its registryAddress field = "nacos://127.0.0.1:8848"
RegistryCapturingInvoker.invoke() {
    // Write instance field registryAddress to ThreadLocal
    pushCapturedRegistryAddress(this.registryAddress);
    // ...
}

Direct Connection Mode Handling

When using direct connection mode with @Reference(url = "dubbo://..."):

Single Direct URL

// ReferenceConfig.createProxy():
if (urls.size() == 1) {
    // ⚡️ Directly call refprotocol.refer(), completely bypassing Cluster.join()!
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
  • Protocol routes to DubboProtocol.refer() based on dubbo:// protocol
  • Creates single DubboInvoker directly, no cluster aggregation needed
  • Cluster.join() is never calledisStaticDirectory check is never reached

Multiple Direct URLs

// Configure: url = "dubbo://host1:20880;dubbo://host2:20881;dubbo://host3:20882"

if (urls.size() > 1) {
    List<Invoker<?>> invokers = new ArrayList<>();
    for (URL url : urls) {
        invokers.add(refprotocol.refer(interfaceClass, url));
    }
    // Create StaticDirectory to aggregate multiple direct invokers
    invoker = cluster.join(new StaticDirectory(invokers));  // ← Triggers Cluster.join()
}
  • StaticDirectory is used (not RegistryDirectory)
  • isStaticDirectory() check returns true and skips wrapping
  • No registry information to capture in direct connection mode

Version Compatibility

Dubbo 3.0.4+ Change

Starting from Dubbo 3.0.4, the Cluster interface added a two-parameter join() method:

// Dubbo 2.7.x and 3.0.0-3.0.3
<T> Invoker<T> join(Directory<T> directory);

// Dubbo 3.0.4+
<T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain);

Our implementation uses reflection to support both versions:

static {
    try {
        // Try Dubbo 3.0.4+ method
        Method m = Cluster.class.getMethod("join", Directory.class, boolean.class);
        JOIN_TWO_ARG = LOOKUP.unreflect(m);
    } catch (ReflectiveOperationException ignored) {
        // Fallback to Dubbo 2.7.x / 3.0.0-3.0.3 method
    }
}

@steverao steverao marked this pull request as ready for review April 7, 2026 15:41
@steverao steverao requested a review from a team as a code owner April 7, 2026 15:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant