@@ -614,6 +614,74 @@ public void cleanClusterEnvironment() {
614614 clusterConfig = new MppClusterConfig ();
615615 }
616616
617+ private boolean isThriftClientSSLEnabled () {
618+ return Boolean .parseBoolean (getDataNodeCommonConfigProperty ("enable_thrift_ssl" , "false" ));
619+ }
620+
621+ private String getDataNodeCommonConfigProperty (final String key , final String defaultValue ) {
622+ return ((MppCommonConfig ) clusterConfig .getDataNodeCommonConfig ())
623+ .getProperty (key , defaultValue );
624+ }
625+
626+ private Properties constructConnectionProperties (
627+ final String username , final String password , final String sqlDialect ) {
628+ final Properties info = BaseEnv .constructProperties (username , password , sqlDialect );
629+ if (isThriftClientSSLEnabled ()) {
630+ info .put (Config .USE_SSL , Boolean .TRUE .toString ());
631+ putIfPresent (
632+ info , Config .TRUST_STORE , getDataNodeCommonConfigProperty ("trust_store_path" , "" ));
633+ putIfPresent (
634+ info , Config .TRUST_STORE_PWD , getDataNodeCommonConfigProperty ("trust_store_pwd" , "" ));
635+ }
636+ return info ;
637+ }
638+
639+ private void putIfPresent (final Properties properties , final String key , final String value ) {
640+ if (value != null && !value .isEmpty ()) {
641+ properties .put (key , value );
642+ }
643+ }
644+
645+ private Session .Builder configureClientSSL (final Session .Builder builder ) {
646+ if (isThriftClientSSLEnabled ()) {
647+ builder
648+ .useSSL (true )
649+ .trustStore (getDataNodeCommonConfigProperty ("trust_store_path" , "" ))
650+ .trustStorePwd (getDataNodeCommonConfigProperty ("trust_store_pwd" , "" ));
651+ }
652+ return builder ;
653+ }
654+
655+ private TableSessionBuilder configureClientSSL (final TableSessionBuilder builder ) {
656+ if (isThriftClientSSLEnabled ()) {
657+ builder
658+ .useSSL (true )
659+ .trustStore (getDataNodeCommonConfigProperty ("trust_store_path" , "" ))
660+ .trustStorePwd (getDataNodeCommonConfigProperty ("trust_store_pwd" , "" ));
661+ }
662+ return builder ;
663+ }
664+
665+ private SessionPool .Builder configureClientSSL (final SessionPool .Builder builder ) {
666+ if (isThriftClientSSLEnabled ()) {
667+ builder
668+ .useSSL (true )
669+ .trustStore (getDataNodeCommonConfigProperty ("trust_store_path" , "" ))
670+ .trustStorePwd (getDataNodeCommonConfigProperty ("trust_store_pwd" , "" ));
671+ }
672+ return builder ;
673+ }
674+
675+ private TableSessionPoolBuilder configureClientSSL (final TableSessionPoolBuilder builder ) {
676+ if (isThriftClientSSLEnabled ()) {
677+ builder
678+ .useSSL (true )
679+ .trustStore (getDataNodeCommonConfigProperty ("trust_store_path" , "" ))
680+ .trustStorePwd (getDataNodeCommonConfigProperty ("trust_store_pwd" , "" ));
681+ }
682+ return builder ;
683+ }
684+
617685 @ Override
618686 public Connection getConnection (
619687 final String username , final String password , final String sqlDialect ) throws SQLException {
@@ -695,7 +763,8 @@ public ISession getSessionConnection() throws IoTDBConnectionException {
695763 final DataNodeWrapper dataNode =
696764 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
697765 final Session session =
698- new Session .Builder ().host (dataNode .getIp ()).port (dataNode .getPort ()).build ();
766+ configureClientSSL (new Session .Builder ().host (dataNode .getIp ()).port (dataNode .getPort ()))
767+ .build ();
699768 session .open ();
700769 return session ;
701770 }
@@ -705,10 +774,11 @@ public ISession getSessionConnection(ZoneId zoneId) throws IoTDBConnectionExcept
705774 final DataNodeWrapper dataNode =
706775 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
707776 final Session session =
708- new Session .Builder ()
709- .host (dataNode .getIp ())
710- .port (dataNode .getPort ())
711- .zoneId (zoneId )
777+ configureClientSSL (
778+ new Session .Builder ()
779+ .host (dataNode .getIp ())
780+ .port (dataNode .getPort ())
781+ .zoneId (zoneId ))
712782 .build ();
713783 session .open ();
714784 return session ;
@@ -720,11 +790,12 @@ public ISession getSessionConnection(final String userName, final String passwor
720790 final DataNodeWrapper dataNode =
721791 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
722792 final Session session =
723- new Session .Builder ()
724- .host (dataNode .getIp ())
725- .port (dataNode .getPort ())
726- .username (userName )
727- .password (password )
793+ configureClientSSL (
794+ new Session .Builder ()
795+ .host (dataNode .getIp ())
796+ .port (dataNode .getPort ())
797+ .username (userName )
798+ .password (password ))
728799 .build ();
729800 session .open ();
730801 return session ;
@@ -734,16 +805,17 @@ public ISession getSessionConnection(final String userName, final String passwor
734805 public ISession getSessionConnection (final List <String > nodeUrls )
735806 throws IoTDBConnectionException {
736807 final Session session =
737- new Session .Builder ()
738- .nodeUrls (nodeUrls )
739- .username (SessionConfig .DEFAULT_USER )
740- .password (SessionConfig .DEFAULT_PASSWORD )
741- .fetchSize (SessionConfig .DEFAULT_FETCH_SIZE )
742- .zoneId (null )
743- .thriftDefaultBufferSize (SessionConfig .DEFAULT_INITIAL_BUFFER_CAPACITY )
744- .thriftMaxFrameSize (SessionConfig .DEFAULT_MAX_FRAME_SIZE )
745- .enableRedirection (SessionConfig .DEFAULT_REDIRECTION_MODE )
746- .version (SessionConfig .DEFAULT_VERSION )
808+ configureClientSSL (
809+ new Session .Builder ()
810+ .nodeUrls (nodeUrls )
811+ .username (SessionConfig .DEFAULT_USER )
812+ .password (SessionConfig .DEFAULT_PASSWORD )
813+ .fetchSize (SessionConfig .DEFAULT_FETCH_SIZE )
814+ .zoneId (null )
815+ .thriftDefaultBufferSize (SessionConfig .DEFAULT_INITIAL_BUFFER_CAPACITY )
816+ .thriftMaxFrameSize (SessionConfig .DEFAULT_MAX_FRAME_SIZE )
817+ .enableRedirection (SessionConfig .DEFAULT_REDIRECTION_MODE )
818+ .version (SessionConfig .DEFAULT_VERSION ))
747819 .build ();
748820 session .open ();
749821 return session ;
@@ -753,8 +825,9 @@ public ISession getSessionConnection(final List<String> nodeUrls)
753825 public ITableSession getTableSessionConnection () throws IoTDBConnectionException {
754826 final DataNodeWrapper dataNode =
755827 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
756- return new TableSessionBuilder ()
757- .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
828+ return configureClientSSL (
829+ new TableSessionBuilder ()
830+ .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ())))
758831 .build ();
759832 }
760833
@@ -763,10 +836,11 @@ public ITableSession getTableSessionConnection(String userName, String password)
763836 throws IoTDBConnectionException {
764837 final DataNodeWrapper dataNode =
765838 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
766- return new TableSessionBuilder ()
767- .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
768- .username (userName )
769- .password (password )
839+ return configureClientSSL (
840+ new TableSessionBuilder ()
841+ .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
842+ .username (userName )
843+ .password (password ))
770844 .build ();
771845 }
772846
@@ -775,61 +849,66 @@ public ITableSession getTableSessionConnectionWithDB(final String database)
775849 throws IoTDBConnectionException {
776850 final DataNodeWrapper dataNode =
777851 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
778- return new TableSessionBuilder ()
779- .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
780- .database (database )
852+ return configureClientSSL (
853+ new TableSessionBuilder ()
854+ .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
855+ .database (database ))
781856 .build ();
782857 }
783858
784859 public ITableSession getTableSessionConnection (List <String > nodeUrls )
785860 throws IoTDBConnectionException {
786- return new TableSessionBuilder ()
787- .nodeUrls (nodeUrls )
788- .username (SessionConfig .DEFAULT_USER )
789- .password (SessionConfig .DEFAULT_PASSWORD )
790- .fetchSize (SessionConfig .DEFAULT_FETCH_SIZE )
791- .zoneId (null )
792- .thriftDefaultBufferSize (SessionConfig .DEFAULT_INITIAL_BUFFER_CAPACITY )
793- .thriftMaxFrameSize (SessionConfig .DEFAULT_MAX_FRAME_SIZE )
794- .enableRedirection (SessionConfig .DEFAULT_REDIRECTION_MODE )
861+ return configureClientSSL (
862+ new TableSessionBuilder ()
863+ .nodeUrls (nodeUrls )
864+ .username (SessionConfig .DEFAULT_USER )
865+ .password (SessionConfig .DEFAULT_PASSWORD )
866+ .fetchSize (SessionConfig .DEFAULT_FETCH_SIZE )
867+ .zoneId (null )
868+ .thriftDefaultBufferSize (SessionConfig .DEFAULT_INITIAL_BUFFER_CAPACITY )
869+ .thriftMaxFrameSize (SessionConfig .DEFAULT_MAX_FRAME_SIZE )
870+ .enableRedirection (SessionConfig .DEFAULT_REDIRECTION_MODE ))
795871 .build ();
796872 }
797873
798874 @ Override
799875 public ISessionPool getSessionPool (final int maxSize ) {
800876 final DataNodeWrapper dataNode =
801877 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
802- return new SessionPool .Builder ()
803- .host (dataNode .getIp ())
804- .port (dataNode .getPort ())
805- .user (SessionConfig .DEFAULT_USER )
806- .password (SessionConfig .DEFAULT_PASSWORD )
807- .maxSize (maxSize )
878+ return configureClientSSL (
879+ new SessionPool .Builder ()
880+ .host (dataNode .getIp ())
881+ .port (dataNode .getPort ())
882+ .user (SessionConfig .DEFAULT_USER )
883+ .password (SessionConfig .DEFAULT_PASSWORD )
884+ .maxSize (maxSize ))
808885 .build ();
809886 }
810887
811888 @ Override
812889 public ITableSessionPool getTableSessionPool (final int maxSize ) {
813890 final DataNodeWrapper dataNode =
814891 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
815- return new TableSessionPoolBuilder ()
816- .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
817- .user (SessionConfig .DEFAULT_USER )
818- .password (SessionConfig .DEFAULT_PASSWORD )
819- .maxSize (maxSize )
892+ return configureClientSSL (
893+ new TableSessionPoolBuilder ()
894+ .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
895+ .user (SessionConfig .DEFAULT_USER )
896+ .password (SessionConfig .DEFAULT_PASSWORD )
897+ .maxSize (maxSize ))
820898 .build ();
821899 }
822900
823901 @ Override
824902 public ITableSessionPool getTableSessionPool (final int maxSize , final String database ) {
825903 DataNodeWrapper dataNode =
826904 this .dataNodeWrapperList .get (rand .nextInt (this .dataNodeWrapperList .size ()));
827- return new TableSessionPoolBuilder ()
828- .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
829- .user (SessionConfig .DEFAULT_USER )
830- .password (SessionConfig .DEFAULT_PASSWORD )
831- .database (database )
832- .maxSize (maxSize )
905+ return configureClientSSL (
906+ new TableSessionPoolBuilder ()
907+ .nodeUrls (Collections .singletonList (dataNode .getIpAndPortString ()))
908+ .user (SessionConfig .DEFAULT_USER )
909+ .password (SessionConfig .DEFAULT_PASSWORD )
910+ .database (database )
911+ .maxSize (maxSize ))
833912 .build ();
834913 }
835914
@@ -853,7 +932,7 @@ protected NodeConnection getWriteConnectionWithSpecifiedDataNode(
853932 Config .IOTDB_URL_PREFIX
854933 + endpoint
855934 + getParam (version , NODE_NETWORK_TIMEOUT_MS , ZERO_TIME_ZONE ),
856- BaseEnv . constructProperties (username , password , sqlDialect ));
935+ constructConnectionProperties (username , password , sqlDialect ));
857936 return new NodeConnection (
858937 endpoint ,
859938 NodeConnection .NodeRole .DATA_NODE ,
@@ -910,7 +989,7 @@ protected List<NodeConnection> getReadConnections(
910989 Config .IOTDB_URL_PREFIX
911990 + endpoint
912991 + getParam (version , NODE_NETWORK_TIMEOUT_MS , ZERO_TIME_ZONE ),
913- BaseEnv . constructProperties (username , password , sqlDialect ))));
992+ constructConnectionProperties (username , password , sqlDialect ))));
914993 });
915994 return readConnRequestDelegate .requestAll ();
916995 }
@@ -957,7 +1036,7 @@ protected List<NodeConnection> getReadConnections(
9571036 Config .IOTDB_URL_PREFIX
9581037 + dataNode .getIpAndPortString ()
9591038 + getParam (version , NODE_NETWORK_TIMEOUT_MS , ZERO_TIME_ZONE ),
960- BaseEnv . constructProperties (username , password , sqlDialect ))));
1039+ constructConnectionProperties (username , password , sqlDialect ))));
9611040
9621041 return readConnRequestDelegate .requestAll ();
9631042 }
@@ -988,8 +1067,10 @@ protected void testJDBCConnection() {
9881067 Config .IOTDB_URL_PREFIX
9891068 + dataNodeEndpoint
9901069 + getParam (null , NODE_NETWORK_TIMEOUT_MS , ZERO_TIME_ZONE ),
991- System .getProperty ("User" , "root" ),
992- System .getProperty ("Password" , "root" ))) {
1070+ constructConnectionProperties (
1071+ System .getProperty ("User" , "root" ),
1072+ System .getProperty ("Password" , "root" ),
1073+ TREE_SQL_DIALECT ))) {
9931074 logger .info ("Successfully connecting to DataNode: {}." , dataNodeEndpoint );
9941075 return null ;
9951076 } catch (final Exception e ) {
0 commit comments