The importance of ETL (Extract, Transform, Load) is self-evident for database users. ETL, which stands for Extract, Transform, Load, can be simply understood as the process of importing data into a database. ETL essentially involves moving data between different systems or data organizational formats. The ETL process helps database users achieve efficient data management and optimization. It ensures that data in the database undergoes careful processing to meet the needs of users.
With the advent of the cloud-native era, cost-effective and easily scalable object storage solutions have become the preferred choice to meet users' demands for high elasticity and cost-effectiveness. Traditional ETL is a process that involves extracting, cleansing, transforming, and ultimately loading data from source systems into target systems for analysis. Traditional ETL is characterized by high throughput and excellent batch loading performance, but it has the drawback of significant impact on both source and target systems. Typically, it is performed during non-business peak hours, resulting in significant data latency, often with a delay of T+1.
Change Data Capture (CDC) refers to capturing data changes occurring in a database or file system in real-time or near real-time and synchronizing it with other data systems while ensuring data consistency and accuracy. CDC is usually implemented by parsing source-side logs, causing minimal impact on the source system and offering low latency. However, compared to batch mode, CDC introduces higher data update overhead to the target system, especially analytical databases. Nevertheless, the CDC approach is increasingly widely used in data synchronization. Similarly, the traditional ETL mode still has irreplaceable advantages in many scenarios.
Whether ETL or CDC, both involve data replication as their primary goal, inevitably leading to some degree of data redundancy and posing risks of data inconsistency. Technologies based on lakehouse, such as multiple-read single-write and zero-ETL, can completely eliminate the potential redundancy and inconsistency risks caused by data replication. The goal of PieCloudDB Database ETL solution is to unify ETL, CDC, and lakehouse technologies.
PieCloudDB's storage-compute separation architecture allows different systems to directly share the same underlying data, eliminating the need for cumbersome data extraction, transformation, and loading processes. Currently, PieCloudDB supports reading files in formats like Parquet directly from object storage, providing convenience in terms of data sharing and access.
In some real-world scenarios, ETL requirements may arise, such as when the same underlying raw data is queried by different systems or when specialized systems for different types of queries have different storage formats. Therefore, when designing an ETL solution, the following factors need to be considered:
Designing these elements will help ensure that data receives proper treatment throughout the entire process, from extraction to loading, providing a solid foundation for data-driven decision-making and analysis.
The Overall Design of the PieCloudDB ETL Solution
In consideration of the ETL needs in the era of cloud-native, the overall design of PieCloudDB's ETL solution primarily comprises three aspects:
Through these three aspects of design, PieCloudDB ETL solution achieves the objectives of effective task scheduling, data extraction from business systems, and data processing on compute nodes. The entire process ensures smooth data transfer from business systems to PieCloudDB and provides the necessary foundation for data transformation and processing. This ensures data accuracy and consistency throughout the integration, transformation, and loading processes, offering high-quality data resources for subsequent analysis and applications.
The Execution Process of PieCloudDB ETL
When initiating an ETL task in PieCloudDB, the specific process is as shown in the following diagram:
Based on business requirements, the task scheduler, pdbconduct, triggers ETL tasks from the source system as needed. This step ensures that the required data is available for further processing.
Once data export is completed, pdbconduct sends the corresponding SQL statements to PieCloudDB's control node. These SQL statements may include data transformations, loading, or other operations to prepare data for entry into the PieCloudDB environment.
After executing the SQL statements on PieCloudDB's control node, pdbconduct collects execution results, records task progress, and any potential error messages. This helps in monitoring the task's status and taking appropriate action in case of issues. If errors occur during execution, pdbconduct logs all error information and takes necessary remedial actions as needed.
INSERT/MERGE Mode
PieCloudDB's ETL supports two common data processing modes: INSERT and MERGE. Users can choose between INSERT and MERGE modes based on their business requirements, data update frequency, and data change scenarios.
INSERT Mode
The INSERT mode involves directly inserting data from the source system into PieCloudDB. In this mode, data extracted from the source system is inserted into the corresponding tables in PieCloudDB row by row or in batches. The INSERT mode is suitable for bulk data import or when data changes are minimal, and the primary operation is adding new records. The advantage of the INSERT mode is its simplicity and suitability for pure import scenarios, especially with time-series data streams that have no logical connection to existing data.
To begin with, specific adapters or plugins need to be developed for connecting PieCloudDB to the particular data source. This may involve developing PostgreSQL extensions to support communication with the data source and parsing data formats.
Next, the control node will read data source information, including connection parameters, authentication details, data extraction rules, etc. It decides whether to split tasks to improve concurrency and efficiency and generates task information such as query statements and task dependencies. Finally, the compute nodes, based on task information, read data from the data source and return raw data and metadata to the control node.
Through these steps, in the INSERT mode of the ETL process, data is obtained from the data source and inserted into PieCloudDB using the Foreign Table approach.
CREATE FOREIGN TABLE foreign_table(meta text, raw bytea);
SELECT meta, raw FROM foreign_table;
Following Step 1, the raw data obtained from the Foreign Table needs to be parsed and transformed to fit the internal row format. This transformation process is typically carried out by the Formatter.
The PieCloudDB Formatter first parses the raw data obtained from the Foreign Table, breaking it down into manageable data units (such as fields, rows, columns, etc.) based on the data format (e.g., CSV, JSON, XML).
Subsequently, the PieCloudDB Formatter converts the parsed data to align with the internal row format of PieCloudDB, generating the required columns.
CREATE FUNCTION formatter(input bytea) RETURNS user_type …;
SELECT meta, raw FROM foreign_table
LATERAL JOIN formatter(raw);
In Step 3, data transformation operations are performed on the columns parsed in Step 2. This is done to ensure data accuracy and consistency, allowing the data to be smoothly inserted into PieCloudDB tables and providing a reliable data foundation for subsequent analysis and applications.
SELECT r.a, r.b+r.c, func(r.d) … FROM (SELECT meta, raw FROM foreign_table
LATERAL JOIN formatter(raw) AS r) sub;
After completing the preceding three steps of data preparation and transformation, in Step 4, the data is ready for insertion into the target table.
INSERT INTO table
SELECT r.a, r.b+r.c, func(r.d) … FROM (SELECT meta, raw FROM foreign_table
LATERAL JOIN formatter(raw) AS r) sub;
Finally, to support resumable data processing, the data is inserted into the historical table to maintain a record of data change history, including additions, updates, and deletions. This enables support for resumable data processing.
INSERT INTO history
SELECT meta FROM foreign_table;
MERGE Mode
PieCloudDB's ETL MERGE/UPSERT mode supports CDC (Change Data Capture) scenarios. This mode is designed to handle data with operation types, logical keys, and sequence keys to achieve data insertion, updating, and deletion operations.
In MERGE mode, the data should include operation fields (OP, i.e., INSERT/UPDATE/DELETE), logical keys, and sequence keys. When a logical key doesn't exist, the mode performs an INSERT operation. When the logical key already exists, it executes an update or delete operation. Sequence keys are used to determine the order of operations. When dealing with multiple operations, the order of execution is determined based on sequence keys to prevent conflicts between operations. MERGE mode allows for handling duplicate data but does not accommodate transactional logic errors.
First, the raw data obtained from external data sources is parsed to extract data fields such as operation fields (OP), logical primary keys (LPK), and sequence keys (OK). Then, the parsed data is imported into a temporary table that has the same structure as the target table. This temporary table is used to store data that is to be merged and updated.
SELECT r.a, r.b+r.c, func(r.d) … FROM (SELECT meta, raw FROM foreign_table
LATERAL JOIN formatter(raw) AS r) parsed;
Within the temporary table, for rows with the same logical primary key (LPK), the row with the maximum sequence key (OK) is selected to be retained. This ensures that only the unique record with the maximum sequence key is kept.
INSERT INTO temp_table
SELECT all_columns FROM ( SELECT *, row_number() OVER PARTITION BY lek
ORDER BY ok DESC FROM parsed
) AS no_dup WHERE no_dup.row_number = 1;
In the target table, matching is done based on the logical primary key (LPK), and rows with the same logical primary key as the data in the temporary table are deleted. This ensures that data updates are applied.
DELETE FROM table USING temp_table
WHERE table.pk = temp_table.pk;
The data, after deduplication and operation processing, is inserted into the target table to complete the data merge and update. The insertion operation may involve INSERT, UPDATE, or DELETE operations, depending on the value of the operation field (OP) in the data.
INSERT INTO table SELECT all_columns
FROM temp_table;
After completing the MERGE, just like in the INSERT mode, historical information is recorded.