100100import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES ;
101101import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE ;
102102import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY ;
103+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE ;
104+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY ;
103105import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE ;
104106import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_VALUE_NAME_KEY ;
105107import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE ;
118120import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_SECURITY_DIR_KEY ;
119121import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_SECURITY_POLICY_KEY ;
120122import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_TCP_BIND_PORT_KEY ;
123+ import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_TIMEOUT_SECONDS_KEY ;
121124import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_VALUE_NAME_KEY ;
122125import static org .apache .iotdb .commons .pipe .config .constant .PipeSinkConstant .SINK_OPC_UA_WITH_QUALITY_KEY ;
123126
@@ -137,8 +140,11 @@ public class OpcUaSink implements PipeConnector {
137140
138141 private static final Map <String , Pair <AtomicInteger , OpcUaNameSpace >>
139142 SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap <>();
143+ private static final Map <String , Pair <AtomicInteger , IoTDBOpcUaClient >>
144+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap <>();
140145
141146 private String serverKey ;
147+ private String nodeUrl ;
142148 private boolean isClientServerModel ;
143149 private String databaseName ;
144150 private String placeHolder4NullTag ;
@@ -238,16 +244,15 @@ public void customize(
238244 "When the OPC UA sink sets 'with-quality' to true, the table model data is not supported." );
239245 }
240246
241- final String nodeUrl =
242- parameters .getStringByKeys (CONNECTOR_OPC_UA_NODE_URL_KEY , SINK_OPC_UA_NODE_URL_KEY );
247+ nodeUrl = parameters .getStringByKeys (CONNECTOR_OPC_UA_NODE_URL_KEY , SINK_OPC_UA_NODE_URL_KEY );
243248 if (Objects .isNull (nodeUrl )) {
244249 customizeServer (parameters );
245250 } else {
246251 if (PathUtils .isTableModelDatabase (databaseName )) {
247252 throw new PipeException (
248253 "When the OPC UA sink points to an outer server, the table model data is not supported." );
249254 }
250- customizeClient (nodeUrl , parameters );
255+ customizeClient (parameters );
251256 }
252257 }
253258
@@ -350,7 +355,7 @@ private void customizeServer(final PipeParameters parameters) {
350355 }
351356 }
352357
353- private void customizeClient (final String nodeUrl , final PipeParameters parameters ) {
358+ private void customizeClient (final PipeParameters parameters ) {
354359 final SecurityPolicy policy =
355360 getSecurityPolicy (
356361 parameters
@@ -380,15 +385,39 @@ private void customizeClient(final String nodeUrl, final PipeParameters paramete
380385 + File .separatorChar
381386 + UUID .nameUUIDFromBytes (nodeUrl .getBytes (TSFileConfig .STRING_CHARSET ))));
382387
383- client =
384- new IoTDBOpcUaClient (
385- nodeUrl ,
386- policy ,
387- provider ,
388- parameters .getBooleanOrDefault (
389- Arrays .asList (CONNECTOR_OPC_UA_HISTORIZING_KEY , SINK_OPC_UA_HISTORIZING_KEY ),
390- CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE ));
391- new ClientRunner (client , securityDir , password ).run ();
388+ final long timeoutSeconds =
389+ parameters .getLongOrDefault (
390+ Arrays .asList (CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY , SINK_OPC_UA_TIMEOUT_SECONDS_KEY ),
391+ CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE );
392+
393+ synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP ) {
394+ client =
395+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP
396+ .compute (
397+ nodeUrl ,
398+ (key , oldValue ) -> {
399+ if (Objects .isNull (oldValue )) {
400+ final IoTDBOpcUaClient result =
401+ new IoTDBOpcUaClient (
402+ nodeUrl ,
403+ policy ,
404+ provider ,
405+ parameters .getBooleanOrDefault (
406+ Arrays .asList (
407+ CONNECTOR_OPC_UA_HISTORIZING_KEY ,
408+ SINK_OPC_UA_HISTORIZING_KEY ),
409+ CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE ));
410+ final ClientRunner runner =
411+ new ClientRunner (result , securityDir , password , userName , timeoutSeconds );
412+ runner .run ();
413+ return new Pair <>(new AtomicInteger (0 ), result );
414+ }
415+ oldValue .getRight ().checkEquals (userName , password , securityDir , policy );
416+ return oldValue ;
417+ })
418+ .getRight ();
419+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP .get (nodeUrl ).getLeft ().incrementAndGet ();
420+ }
392421 }
393422
394423 private SecurityPolicy getSecurityPolicy (final String securityPolicy ) {
@@ -521,10 +550,6 @@ public interface ThrowingBiConsumer<T, U, E extends Exception> {
521550
522551 @ Override
523552 public void close () throws Exception {
524- if (Objects .nonNull (client )) {
525- client .disconnect ();
526- }
527-
528553 if (serverKey == null ) {
529554 return ;
530555 }
@@ -544,6 +569,26 @@ public void close() throws Exception {
544569 }
545570 }
546571 }
572+
573+ if (nodeUrl == null ) {
574+ return ;
575+ }
576+
577+ synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP ) {
578+ final Pair <AtomicInteger , IoTDBOpcUaClient > pair =
579+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP .get (nodeUrl );
580+ if (pair == null ) {
581+ return ;
582+ }
583+
584+ if (pair .getLeft ().decrementAndGet () <= 0 ) {
585+ try {
586+ pair .getRight ().disconnect ();
587+ } finally {
588+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP .remove (nodeUrl );
589+ }
590+ }
591+ }
547592 }
548593
549594 /////////////////////////////// Getter ///////////////////////////////
0 commit comments