In the face of various types of traditional databases that run for extended periods in customer environments, designing an efficient data migration plan is a significant challenge. This plan must be flexible enough to handle various data import scenarios and heterogeneous databases while meeting customer requirements for accuracy, consistency, and real-time performance. PieCloudDB Database has developed a rich set of data synchronization tools to achieve efficient data flow. This article will focus on a detailed introduction to the PieCloudDB Flink Connector.
PieCloudDB is a cloud-native distributed virtual data warehouse that provides enterprises with a new cloud-based digital solution. It helps businesses build a competitive barrier with data assets and achieve unlimited data computation through optimal allocation of cloud resources. PieCloudDB integrates physical data warehouses into a cloud-native data computation platform through various innovative technologies. It has achieved virtualization of analytical data warehouses on the cloud, creating a new eMPP architecture that separates storage and computation. This breaks through the bottlenecks of traditional MPP databases, eliminates data silos and achieves instant scaling as needed, greatly reducing the waste of storage space.
Apache Flink is a distributed stream processing engine designed for stateful computations on unbounded or bounded data streams. It can run in all general cluster environments and achieve memory-level computing speeds at any scale. Initially developed from the Stratosphere project at the Technical University of Berlin, Flink was designed to support complex large-scale data analysis tasks and became a top-level project of the Apache Software Foundation in 2014. Users can utilize Flink's DataStream API or Table SQL API to achieve powerful and efficient real-time data computation capabilities. Additionally, Flink's native checkpoint mechanism provides a guarantee of data consistency.
As a stream processing framework, Apache Flink is closely integrated with other open-source projects and tools. After years of development, the Flink community has built a rich ecosystem around Flink. The PieCloudDB Flink Connector, is a self-developed Flink connector by the OpenPie. It is designed to efficiently write data from the Flink system into PieCloudDB, in conjunction with Flink's checkpoint mechanism to ensure the exactly-once semantics of data import. This article will introduce the functions and principles of PieCloudDB Flink Connector in detail, along with demonstrations through examples.
PieCloudDB Flink Connector offers various methods for importing data from Flink into PieCloudDB, including Append-Only and Merge, to meet different levels of import semantics.
In terms of access methods, PieCloudDB Flink Connector provides multiple options, including using the Flink DataStream API to integrate this component, or directly using Flink SQL.
PieCloudDB Flink Connector offers a Merge import mode, which uses an idempotent write scheme, in conjunction with Flink's native checkpoint mechanism, to ensure the reliability and consistency of the import results.
Additionally, PieCloudDB Flink Connector not only supports real-time data import for a single table but also for an entire database simultaneously. However, the latter only supports the integration method using the Flink DataStream API and does not support Flink SQL.
Exactly-Once
In PieCloudDB Flink Connector, the PieCloudDBSink and PieCloudDBWriter classes implement the Flink StatefulSink and StatefulSinkWriter interfaces, respectively.
When Flink's checkpoint feature is enabled, during a specific checkpoint execution period, PieCloudDBWriter is responsible for continuously writing the received data into a memory pipeline, while an asynchronous thread copies this data into a memory temporary table in PieCloudDB. When the PieCloudDB Flink Connector operator receives the checkpoint signal, it first waits for all data to be copied into PieCloudDB before executing the second phase, including data merging and writing into the physical table. If any exceptions occur during the process, the Flink will automatically recover the job from the last checkpoint, ensuring no data loss.
In the second phase, PieCloudDB Flink Connector uses an idempotent write scheme to ensure no data duplication. The specific approach is that after all data is imported into PieCloudDB's temporary table, it merges operations based on the primary key fields and the time series of the data to aggregate all the insert, delete, and update operations for each primary key during this period to obtain the final result, which is then written into PieCloudDB.
The aggregation operation first combines the data based on the primary key and time series of the write, then deletes the record corresponding to the primary key in the target table, and finally writes the latest update into the target table. For example, if a record with a primary key of 1 undergoes update and delete operations in chronological order, the final result is to delete the record corresponding to that primary key from PieCloudDB. The data write time series is consistent with the order of data in Flink, and this time series information is recorded by expanding a separate bigint column in the temporary table to track each piece of data. With this idempotent write scheme, even if data duplication occurs, it can ensure the exactly once import semantics.
The above checkpoint mechanism and idempotent writing process are shown in the figure below:
Checkpoint Mechanism and Idempotent Writing Process
Full Database Synchronization
For the full database synchronization scenario, the issues to be solved are simultaneous writing of multiple tables and the generality of the connection pool. First, for the full database synchronization scenario, there may be multiple tables writing into PieCloudDB Flink Connector at the same time. The logic for handling a single table previously may lead to untimely processing of new table data, resulting in data loss. PieCloudDB Flink Connector maintains a set of Loader pools internally, and within the same Flink checkpoint period, it assigns a corresponding Loader for each table, caching in the Loader pool. The data written into each table is assigned to the corresponding Loader, and the processing logic is consistent with the logic for single table import. The only difference is that at the end of the Flink checkpoint period, PieCloudDB Flink Connector will flush all tables with data written in the checkpoint into PieCloudDB.
Another issue that needs to be solved is the generality of the connection pool. According to the above design, each instance of PieCloudDB Flink Connector maintains a Loader pool, and each Loader occupies a PieCloudDB database connection. If users need to improve import performance, the most direct approach is to increase the parallelism of the Flink, that is, to create multiple instances of PieCloudDB Flink Connector to accelerate full database synchronization, which is very necessary for scenarios with a large amount of historical data. However, this will result in a large number of PieCloudDB database connections required for the entire Flink job, which is uncontrollable, because both the number of tables and the parallelism of the job are unpredictable. Simply setting the maximum number of connections in the internal connection pool will only lead to job failure due to the inability to obtain new connections.
To solve this problem, PieCloudDB Flink Connector has designed a queuing algorithm internally: if the database connection pool's connections are full, then new tables within a single checkpoint need to be queued. These queued tables will not be imported into the database until the end of the checkpoint, when other tables are written to the database and connections are released. During this period, all relevant data for these queued tables will be temporarily stored in memory. To avoid memory overflow, the original data has been pre-parsed, leaving only the key information that can be used for the import process, greatly reducing the memory usage and avoiding memory overflow. With this model, the full database synchronization process can ensure that the number of database connections is controllable throughout, not exceeding the product of the concurrency and the maximum number of connections in a single PieCloudDB database connection pool.
The full database synchronization function of PieCloudDB Flink Connector needs to be used in conjunction with PieCloudDB's dynamic job executor, which will be further described in subsequent articles.
Next, we will use MySQL as the data source to demonstrate the process of using PieCloudDB Flink Connector to synchronize data from MySQL to PieCloudDB.
Create a MySQL source table:
create table student (id int primary key, name varchar(32), score int, sex char(1));
mysql> desc student;
+---------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| id | int | NO | PRI | NULL | |
| name | varchar(32) | YES | | NULL | |
| score | int | YES | | NULL | |
| sex | char(1) | YES | | NULL | |
+---------+-------------+------+-----+---------+-------+
4 rows in set (0.03 sec)
Data Insert:
insert into student (id, name, score, sex) values (1, 'student1', 65, '1');
insert into student (id, name, score, sex) values (2, 'student2', 75, '0');
insert into student (id, name, score, sex) values (3, 'student3', 85, '1');
insert into student (id, name, score, sex) values (4, 'student4', 95, '0');
mysql> select * from student;
+----+----------+-------+------+
| id | name | score | sex |
+----+----------+-------+------+
| 1 | student1 | 65 | 1 |
| 2 | student2 | 75 | 1 |
| 3 | student3 | NULL | NULL |
| 4 | student4 | NULL | NULL |
+----+----------+-------+------+
4 rows in set (0.01 sec)
Create a PieCloudDB target table (can be done on the 'Data Insights' of PieCloudDB's cloud-native platform):
create table student (id int primary key, name varchar(32), score int, sex char(1));
demo=> \d student
Table "public.student"
Column | Type | Collation | Nullable | Default
--------+-----------------------+-----------+----------+---------
id | integer | | not null |
name | character varying(32) | | |
score | integer | | |
sex | character(1) | | |
Currently an empty table:
demo=> select * from student;
id | name | score | sex
----+------+-------+-----
(0 rows)
Start the Flink cluster:
ubuntu :: work/flink/flink-1.18.0 >> bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host ubuntu.
Starting taskexecutor daemon on host ubuntu.
[INFo] 1 instance(s) of taskexecutor are already running on ubuntu.
Starting taskexecutor daemon on host ubuntu.
Use the Flink SQL client to connect to the cluster, import related dependencies, and enable checkpoint:
Flink SQL> add jar '/home/frankie/work/download/flink-sql-connector-mysql-cdc-2.4.0.jar';
[INFO] Execute statement succeed.
Flink SQL> add jar '/home/frankie/work/download/flink-sql-connector-pieclouddb-1.2.0.jar';
[INFO] Execute statement succeed.
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
[INFO] Execute statement succeed.
Create a Flink CDC source table:
Flink SQL> CREATE TABLE source_student_mysql (
id INT,
name STRING,
score INT,
sex CHAR(1),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'testdb',
'table-name' = 'student');
[INFO] Execute statement succeed.
Create a Flink PieCloudDB target table:
Flink SQL> CREATE TABLE sink_student_pdb (
id INT,
name STRING,
score INT,
sex CHAR(1),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'pieclouddb',
'hostname' = 'pieclouddb-host',
'port' = 'your-pieclouddb-port',
'username' = 'your-username',
'password' = 'your-password',
'pdb_warehouse' = 'your-pdbwarehouse',
'database-name' = 'demo',
'table-name' = 'student',
'load_mode' = 'merge');
[INFO] Execute statement succeed.
Execute the import:
Flink SQL> INSERT INTO sink_student_pdb SELECT * FROM source_student_mysql;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 660b747ef8fb64f95064a461af9924bc
Check the Flink WebUI to see that this data import stream task is running continuously:
Apache Flink Dashboard
Check the data in PieCloudDB to see that the data has been correctly imported:
Data Successfully Imported Into PieCloudDB
In addition to using the Flink SQL access method, PieCloudDB Flink Connector also supports use through the Flink Datastream API.
In the future, OpenPie is committed to continuously enhancing and iterating on the PieCloudDB Flink Connector, planning to introduce advanced features such as schema evolution and dynamic table addition capabilities to meet more complex data processing needs.
At the same time, PieCloudDB will continue to expand data ecosystem through enriching its data synchronization tool, striving to create a more comprehensive and powerful set of tools, including integration tools for big data processing frameworks such as Flink and Spark, as well as real-time data synchronization tools such as CDC (Change Data Capture) and Kafka, allowing users to achieve efficient data flow and real-time processing, further unleashing the potential of data.