In Cloud-Native Era, What Kind of ETL Solution is Needed? Check out PieCloudDB Database’s ETL Solution
SEPTEMBER 27TH, 2023

The importance of ETL (Extract, Transform, Load) is self-evident for database users. ETL, which stands for Extract, Transform, Load, can be simply understood as the process of importing data into a database. ETL essentially involves moving data between different systems or data organizational formats. The ETL process helps database users achieve efficient data management and optimization. It ensures that data in the database undergoes careful processing to meet the needs of users. 

 

ETL in cloud-native environment 

 

With the advent of the cloud-native era, cost-effective and easily scalable object storage solutions have become the preferred choice to meet users' demands for high elasticity and cost-effectiveness. Traditional ETL is a process that involves extracting, cleansing, transforming, and ultimately loading data from source systems into target systems for analysis. Traditional ETL is characterized by high throughput and excellent batch loading performance, but it has the drawback of significant impact on both source and target systems. Typically, it is performed during non-business peak hours, resulting in significant data latency, often with a delay of T+1. 

 

Change Data Capture (CDC) refers to capturing data changes occurring in a database or file system in real-time or near real-time and synchronizing it with other data systems while ensuring data consistency and accuracy. CDC is usually implemented by parsing source-side logs, causing minimal impact on the source system and offering low latency. However, compared to batch mode, CDC introduces higher data update overhead to the target system, especially analytical databases. Nevertheless, the CDC approach is increasingly widely used in data synchronization. Similarly, the traditional ETL mode still has irreplaceable advantages in many scenarios. 

 

Whether ETL or CDC, both involve data replication as their primary goal, inevitably leading to some degree of data redundancy and posing risks of data inconsistency. Technologies based on lakehouse, such as multiple-read single-write and zero-ETL, can completely eliminate the potential redundancy and inconsistency risks caused by data replication. The goal of PieCloudDB Database ETL solution is to unify ETL, CDC, and lakehouse technologies. 

 

PieCloudDB's storage-compute separation architecture allows different systems to directly share the same underlying data, eliminating the need for cumbersome data extraction, transformation, and loading processes. Currently, PieCloudDB supports reading files in formats like Parquet directly from object storage, providing convenience in terms of data sharing and access. 

 

In some real-world scenarios, ETL requirements may arise, such as when the same underlying raw data is queried by different systems or when specialized systems for different types of queries have different storage formats. Therefore, when designing an ETL solution, the following factors need to be considered: 

 

  • Multiple Data Sources: It's necessary to consider the diversity of different systems and data sources, such as production IoT data, to ensure the ability to extract data from various origins (transactional databases, HDFS, Kafka, etc.) and meet the data access requirements of different systems. 

 

  • Various Data Formats: Data may exist in various formats, such as CSV, JSON, Parquet, binary, etc. Ensuring that the ETL process can handle different data formats is crucial. It should be capable of parsing, transforming, and standardizing this data to meet the requirements of the target systems. 

 

  • Universal Data Processing/Transformation: Make sure that data can be consistently cleaned, processed, and transformed to meet the needs of different systems. This will enhance data quality and reduce redundant transformation logic. 

 

  • Uniqueness and Transactional Assurance: Ensure data uniqueness and transactional integrity during the data loading process. Avoid importing duplicate data and implement transaction control within the ETL process to maintain data integrity. 

 

  • Resumable Data Transfer: Implement mechanisms for resumable data transfer during the ETL process by recording and recovering processing states, thereby avoiding data loss or redundant processing. 

 

  • Error Handling: The ability to capture, log, and handle errors that occur during the ETL process, including data format errors, connection issues, etc., is essential to ensure data integrity and reliability. 

 

Designing these elements will help ensure that data receives proper treatment throughout the entire process, from extraction to loading, providing a solid foundation for data-driven decision-making and analysis. 

 

Overall Design and Implementation of the PieCloudDB ETL Solution 

 

The Overall Design of the PieCloudDB ETL Solution 

 

In consideration of the ETL needs in the era of cloud-native, the overall design of PieCloudDB's ETL solution primarily comprises three aspects: 

 

  • Task Scheduling Control with pdbconduct: In the ETL process, task scheduling and coordination are managed by pdbconduct. pdbconduct acts as the central control point, overseeing task scheduling, execution order, and dependencies. Through pdbconduct, different ETL tasks can be intelligently scheduled, ensuring the effective operation of the entire data workflow. 

 

  • Data Source Extraction (Plugins/Client Tools): The data source extraction phase involves retrieving data from the original databases of business systems. This requires the development of plugins and tools to efficiently export data from these business systems. These plugins and tools can connect to different business systems, extract data from them, and then transform it into a format suitable for the ETL process. 

 

  • Decoupling of Compute Nodes: Foreign Table and Formatter: Running Foreign Tables on compute nodes is the core of the ETL process. This step transfers data extracted from business systems to PieCloudDB and maintains various data formats on compute nodes. Foreign Tables allow mapping data to database tables, creating an environment for data transformation and processing. 

 

Through these three aspects of design, PieCloudDB ETL solution achieves the objectives of effective task scheduling, data extraction from business systems, and data processing on compute nodes. The entire process ensures smooth data transfer from business systems to PieCloudDB and provides the necessary foundation for data transformation and processing. This ensures data accuracy and consistency throughout the integration, transformation, and loading processes, offering high-quality data resources for subsequent analysis and applications. 

 

The Execution Process of PieCloudDB ETL 

 

When initiating an ETL task in PieCloudDB, the specific process is as shown in the following diagram: 



  • Source System Connection and Data Extraction: First, establish a connection with the source system and perform SQL queries or other high-frequency operations to extract the required data. This step helps in retrieving the data that needs to undergo ETL. 

 

  • Data Transfer to Intermediate System: The extracted data can be directly transferred to an intermediate system, which can be the local disk of the source system, PieCloudDB's disk, or another intermediate storage location. This step aids in temporarily storing the data for further processing. 

 

  • Intermediate System Processing: The intermediate system could be cloud storage or a server (e.g., Kafka), and the choice depends on the needs of the business scenario. In the intermediate system, data is prepared for the subsequent Foreign Table. 

 

  • Foreign Table Connection: On the prepared data, data is mapped to PieCloudDB through the Foreign Table's connection mechanism. This step allows data to be further processed and analyzed within the PieCloudDB environment. 

 

  • Data Loading and Verification: Data can undergo transformations and processing, while simultaneously ensuring that files on cloud storage meet expectations. Necessary validations and checks are performed to ensure data integrity and correctness. 

 

Based on business requirements, the task scheduler, pdbconduct, triggers ETL tasks from the source system as needed. This step ensures that the required data is available for further processing. 

 

Once data export is completed, pdbconduct sends the corresponding SQL statements to PieCloudDB's control node. These SQL statements may include data transformations, loading, or other operations to prepare data for entry into the PieCloudDB environment. 

 

After executing the SQL statements on PieCloudDB's control node, pdbconduct collects execution results, records task progress, and any potential error messages. This helps in monitoring the task's status and taking appropriate action in case of issues. If errors occur during execution, pdbconduct logs all error information and takes necessary remedial actions as needed. 

 

INSERT/MERGE Mode 

 

PieCloudDB's ETL supports two common data processing modes: INSERT and MERGE. Users can choose between INSERT and MERGE modes based on their business requirements, data update frequency, and data change scenarios. 

 

INSERT Mode 

 

The INSERT mode involves directly inserting data from the source system into PieCloudDB. In this mode, data extracted from the source system is inserted into the corresponding tables in PieCloudDB row by row or in batches. The INSERT mode is suitable for bulk data import or when data changes are minimal, and the primary operation is adding new records. The advantage of the INSERT mode is its simplicity and suitability for pure import scenarios, especially with time-series data streams that have no logical connection to existing data. 



  • Step 1: Obtain Raw Data 

 

To begin with, specific adapters or plugins need to be developed for connecting PieCloudDB to the particular data source. This may involve developing PostgreSQL extensions to support communication with the data source and parsing data formats. 

 

Next, the control node will read data source information, including connection parameters, authentication details, data extraction rules, etc. It decides whether to split tasks to improve concurrency and efficiency and generates task information such as query statements and task dependencies. Finally, the compute nodes, based on task information, read data from the data source and return raw data and metadata to the control node. 

 

Through these steps, in the INSERT mode of the ETL process, data is obtained from the data source and inserted into PieCloudDB using the Foreign Table approach. 

 

CREATE FOREIGN TABLE foreign_table(meta text, raw bytea);  
SELECT meta, raw FROM foreign_table; 

 

  • Step 2: Data Preparation and Parsing 

 

Following Step 1, the raw data obtained from the Foreign Table needs to be parsed and transformed to fit the internal row format. This transformation process is typically carried out by the Formatter. 

 

The PieCloudDB Formatter first parses the raw data obtained from the Foreign Table, breaking it down into manageable data units (such as fields, rows, columns, etc.) based on the data format (e.g., CSV, JSON, XML). 

 

Subsequently, the PieCloudDB Formatter converts the parsed data to align with the internal row format of PieCloudDB, generating the required columns. 

 

CREATE FUNCTION formatter(input bytea) RETURNS user_type …;  
SELECT meta, raw FROM foreign_table  
LATERAL JOIN formatter(raw); 

 

  • Step 3: Data Transformation 

 

In Step 3, data transformation operations are performed on the columns parsed in Step 2. This is done to ensure data accuracy and consistency, allowing the data to be smoothly inserted into PieCloudDB tables and providing a reliable data foundation for subsequent analysis and applications. 

 

SELECT r.a, r.b+r.c, func(r.d) … FROM (SELECT meta, raw FROM foreign_table  
LATERAL JOIN formatter(raw) AS r) sub; 


  • Step 4: Insertion into Target Table 

 

After completing the preceding three steps of data preparation and transformation, in Step 4, the data is ready for insertion into the target table. 


INSERT INTO table  
SELECT r.a, r.b+r.c, func(r.d) … FROM (SELECT meta, raw FROM foreign_table  
LATERAL JOIN formatter(raw) AS r) sub; 

 

  • Step 5: Insertion into Historical Table with Resumable Capability 

 

Finally, to support resumable data processing, the data is inserted into the historical table to maintain a record of data change history, including additions, updates, and deletions. This enables support for resumable data processing. 

 

INSERT INTO history  
SELECT meta FROM foreign_table; 

 

MERGE Mode 

 

PieCloudDB's ETL MERGE/UPSERT mode supports CDC (Change Data Capture) scenarios. This mode is designed to handle data with operation types, logical keys, and sequence keys to achieve data insertion, updating, and deletion operations. 

 

In MERGE mode, the data should include operation fields (OP, i.e., INSERT/UPDATE/DELETE), logical keys, and sequence keys. When a logical key doesn't exist, the mode performs an INSERT operation. When the logical key already exists, it executes an update or delete operation. Sequence keys are used to determine the order of operations. When dealing with multiple operations, the order of execution is determined based on sequence keys to prevent conflicts between operations. MERGE mode allows for handling duplicate data but does not accommodate transactional logic errors. 



  • Step 1: Data Parsing and Import into Temporary Table 

 

First, the raw data obtained from external data sources is parsed to extract data fields such as operation fields (OP), logical primary keys (LPK), and sequence keys (OK). Then, the parsed data is imported into a temporary table that has the same structure as the target table. This temporary table is used to store data that is to be merged and updated. 

 

SELECT r.a, r.b+r.c, func(r.d) … FROM (SELECT meta, raw FROM foreign_table  
LATERAL JOIN formatter(raw) AS r) parsed; 

 

  • Step 2: Deduplication Within the Temporary Table 

 

Within the temporary table, for rows with the same logical primary key (LPK), the row with the maximum sequence key (OK) is selected to be retained. This ensures that only the unique record with the maximum sequence key is kept. 

 

INSERT INTO temp_table  
SELECT all_columns FROM ( SELECT *, row_number() OVER PARTITION BY lek  
ORDER BY ok DESC FROM parsed  
) AS no_dup WHERE no_dup.row_number = 1

 

  • Step 3: Deleting PK Matching Rows in the Target Table 

 

In the target table, matching is done based on the logical primary key (LPK), and rows with the same logical primary key as the data in the temporary table are deleted. This ensures that data updates are applied. 

 

DELETE FROM table USING temp_table  
WHERE table.pk = temp_table.pk; 

 

  • Step 4: Inserting into the Target Table to Complete the Merge 

 

The data, after deduplication and operation processing, is inserted into the target table to complete the data merge and update. The insertion operation may involve INSERT, UPDATE, or DELETE operations, depending on the value of the operation field (OP) in the data. 

 

INSERT INTO table SELECT all_columns  
FROM temp_table; 

 

After completing the MERGE, just like in the INSERT mode, historical information is recorded. 

Related Blogs:
no related blog