The TiDB Connector for Spark enables you to use TiDB as the data source of Apache Spark, similar to other data sources (PostgreSQL, HDFS, S3, etc.).
The TiDB Connector supports Spark 2.3.0+ and Spark 2.4.0+.
TiDB Connector supports bi-directional data movement between TiDB and Spark clusters.
Using the connector, you can perform the following two operations:
- Populate a Spark DataFrame from a table in TiDB.
- Write the contents of a Spark DataFrame to a table in TiDB.
Since TiDB is a database that supports transaction, the TiDB Connector for Spark also supports transaction, which means that:
- all the data in DataFrame is written to TiDB successfully if no conflicts exist.
- no data in DataFrame is written to TiDB successfully if conflicts exist.
- no partial changes are visible to other sessions until the transaction is committed.
TiSpark only supports the Append SaveMode. This behavior is controlled by the replace option. The default value is false.
In addition, if replace is true, data to be inserted is duplicated before the insertion.
-
If
replaceistrue:- if the primary key or unique index exists in the table, data is updated.
- if no same primary key or unique index exists, data is inserted.
- if enable authorization check, users need insert and delete privilege to write.
-
If
replaceisfalse:- if the primary key or unique index exists in the database, data with conflicts expects an exception.
- if no same primary key or unique index exists, data is inserted.
- if enable authorization check, users need insert privilege to write.
The Spark connector adheres to the standard Spark API, but with the addition of TiDB-specific options.
The connector can be used either with or without extensions enabled.
Fow how to use it with extensions enabled, see code examples with extensions.
-
Initiate
SparkConf.val sparkConf = new SparkConf(). setIfMissing("spark.master", "local[*]"). setIfMissing("spark.app.name", getClass.getName). setIfMissing("spark.sql.extensions", "org.apache.spark.sql.TiExtensions"). setIfMissing("spark.tispark.pd.addresses", "pd0:2379"). setIfMissing("spark.tispark.tidb.addr", "tidb"). setIfMissing("spark.tispark.tidb.port", "4000") val spark = SparkSession.builder.config(sparkConf).getOrCreate()
-
Write using scala.
/* create table before run the code CREATE TABLE tpch_test.target_table_orders ( `O_ORDERKEY` int(11) NOT NULL, `O_CUSTKEY` int(11) NOT NULL, `O_ORDERSTATUS` char(1) NOT NULL, `O_TOTALPRICE` decimal(15,2) NOT NULL, `O_ORDERDATE` date NOT NULL, `O_ORDERPRIORITY` char(15) NOT NULL, `O_CLERK` char(15) NOT NULL, `O_SHIPPRIORITY` int(11) NOT NULL, `O_COMMENT` varchar(79) NOT NULL ) */ // select data to write val df = spark.sql("select * from tpch_test.ORDERS") // write data to tidb df.write. format("tidb"). option("tidb.user", "root"). option("tidb.password", ""). option("database", "tpch_test"). option("table", "target_table_orders"). mode("append"). save()
TiDB Connector supports writing data into multi-tables in one transaction using scala/java API.
val inputDatabase = args(0)
val inputTable1 = args(1)
val inputTable2 = args(2)
val outputDatabase = args(3)
val outputTable1 = args(4)
val outputTable2 = args(5)
val df1 = spark.sql(s"select * from $inputTable1")
val df2 = spark.sql(s"select * from $inputTable2")
val data = Map(DBTable(outputDatabase, outputTable1) -> df1, DBTable(outputDatabase, outputTable2) -> df2)
// writing data to multi-tables
TiBatchWrite.write(
data,
spark,
Map(
"tidb.addr" -> "127.0.0.1",
"tidb.port" -> "4000",
"tidb.user"-> "root",
"tidb.password" -> "")
)
The following table shows the TiDB-specific options, which can be passed in through TiDBOptions or SparkConf.
tidb.addr, tidb.port, tidb.user and tidb.password only are needed when enableUpdateTableStatistics is true.
| Key | Required | Default | Description |
|---|---|---|---|
| pd.addresses | true | - | The addresses of PD clusters, split by comma |
| tidb.addr | true | - | TiDB address, which currently only supports one instance |
| tidb.port | true | - | TiDB Port |
| tidb.user | true | - | TiDB User |
| tidb.password | true | - | TiDB Password |
| database | true | - | TiDB Database |
| table | true | - | TiDB Table |
| replace | false | false | To define the behavior of append |
| useTableLock | false | true (3.x) false (4.x) | Whether to lock the table during writing |
| enableRegionSplit | false | true | To split Region to avoid hot Region during insertion |
| scatterWaitMS | false | 300000 | Max time to wait scatter region |
| bytesPerRegion | false | 100663296 (96M) | Decrease this parameter to split more regions (increase write concurrency) |
| enableUpdateTableStatistics | false | false | Update statistics in table mysql.stats_meta (tidb.user must own update privilege to table mysql.stats_meta if set true) |
| deduplicate | false | true | Deduplicate data in DataFrame according to primary key and unique key |
TiDB's version must be 3.0.14 or later.
Please use the following command to increase GC life time and make sure that the total time of reading/writing does not exceed the GC life time.
SET GLOBAL tidb_gc_life_time = '6h';
Make sure that the following TiDB configuration items (table-lock) are correctly set if TiDB-3.x is used.
# enable-table-lock is used to control the table lock feature. The default value is false, indicating that the table lock feature is disabled.
enable-table-lock: true
# delay-clean-table-lock is used to control the time (milliseconds) of delay before unlocking the table in abnormal situations.
delay-clean-table-lock: 60000
TiSpark (>= 2.4.2) supports writing data to clustered index tables, which is a new feature in TiDB-5.0.0.
TiSpark does not support writing to the following tables:
- tables with auto random column
- partition table
- tables with generated column
The following types of SparkSQL Data are currently not supported for writing to TiDB:
- BinaryType
- ArrayType
- MapType
- StructType
The complete conversion metrics are as follows.
| Write | Boolean | Byte | Short | Integer | Long | Float | Double | String | Decimal | Date | Timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|
| BIT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ |
| BOOLEAN | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ |
| TINYINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ |
| SMALLINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ |
| MEDIUMINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ |
| INTEGER | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ |
| BIGINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ |
| FLOAT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | ❌ |
| DOUBLE | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | ❌ |
| DECIMAL | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
| DATE | ❌ | ❌ | ❌ | ❌ | ✅ | ✅ | ❌ | ❌ | ❌ | ✅ | ✅ |
| DATETIME | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ |
| TIMESTAMP | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ |
| TIME | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
| YEAR | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
| CHAR | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ |
| VARCHAR | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ |
| TINYTEXT | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ |
| TEXT | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ |
| MEDIUMTEXT | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ |
| LONGTEXT | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ | ✅ |
| BINARY | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ |
| VARBINARY | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ |
| TINYBLOB | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ |
| BLOB | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ |
| MEDIUMBLOB | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ |
| LONGBLOB | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ |
| ENUM | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ |
| SET | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
| JSON | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |