WHITE PAPER

Rediscovering CDC

Rediscovering CDC: A Concise Overview for Modern Data Engineers

In the rapidly evolving landscape of data management, Change Data Capture (CDC) has emerged as a critical strategy for organizations looking to maintain real-time data accuracy and integrity. However, implementing a CDC system is not without its challenges. In this article, we will explore the common problems faced during CDC implementation and propose potential solutions to navigate these hurdles effectively.

Change Data Capture Essentials:

Before delving into the challenges of implementing Change Data Capture (CDC), let's take a moment to refresh our understanding of this critical data engineering concept. CDC is a powerful technique that allows developers to track and capture modifications in source databases in real-time, including inserts, updates, and deletes.

Input
Output
def upsert_to_delta(microBatchDF: DataFrame, batchId: Long) {
	microBatchOutputDF.createOrReplaceTempView("updates")

	microBatchOutputDF.sparkSession.sql(s"""
		MERGE INTO cdc_data t
		USING updates s
		ON s.key = t.key
		WHEN MATCHED THEN UPDATE SET *
		WHEN NOT MATCHED THEN INSERT *
	""")
}

cdcData.writeStream
	.foreachBatch(upsertToDelta _)
	.outputMode("append")
	.start()

The Challenges of Implementing CDC

Multiple Updates to the Same Key

One of the primary challenges in CDC is managing multiple updates to the same key within a short time frame. When different processes attempt to update the same record simultaneously, it can lead to data inconsistencies and overwriting of important changes. This situation complicates the tracking of the most recent updates and can result in significant errors in the final dataset.

Micro Batch input
microBatchOutputDF
	.groupBy("key")
	.agg(max by("timestamp", struct("*").alias("row")))
	.select("row.*")

To handle multiple updates to the same key, group the data by the primary key and select the row with the maximum timestamp for each key. This approach ensures that only the most recent update is retained, effectively resolving conflicts caused by multiple changes to the same record within a batch.

Including the above changes, the code looks like

def upsert_to_delta(microBatchDF: DataFrame, batchId: Long) {
	microBatchOutputDF
		.groupBy("key")
		.agg(max_by("timestamp", struct("*").alias("row") ))
		.select("row.*")
		.createOrReplaceTempView("updates")

	microBatchOutputDF.sparkSession.sql(s"""
		MERGE INTO cdc_data t
		USING updates s
		ON s.key = t.key
		WHEN MATCHED THEN UPDATE SET *
		WHEN NOT MATCHED THEN INSERT *
	""")
}

cdcData.writeStream
	.foreachBatch(upsertToDelta _)
	.outputMode("append")
	.start()

Out of Order Updates Across Batches

In a streaming environment, data can arrive out of sequence, leading to out-of-order updates. This issue can disrupt the logical flow of data processing and cause confusion when trying to reconcile changes. Maintaining the correct sequence of updates is crucial for ensuring data accuracy.

Micro Batch 1
Micro Batch 2
WHEN MATCHED THEN UPDATE SET
	TS=CASE WHEN  s.ts > t.ts  THEN s.ts ELSE t.ts,
	A=CASE WHEN s.ts > t.ts THEN s.a ELSE t.a,
	B=CASE WHEN s.ts > t.ts THEN s.b ELSE t.b,
	... for every column ...

To handle out-of-order updates across batches in CDC, store a timestamp for each row's last update. When processing new data, compare the incoming timestamp with the stored one. If the new timestamp is greater, use the new data; otherwise, retain the existing data. This approach ensures that only the most recent updates are applied, maintaining data consistency even when updates arrive out of sequence.

Including the above changes, the code looks like

def upsert_to_delta(microBatchDF: DataFrame, batchId: Long) {
	microBatchOutputDF
		.groupBy("key")
		.agg(max_by("timestamp", struct("*").alias("row") ))
		.select("row.*")
		.createOrReplaceTempView("updates")

	microBatchOutputDF.sparkSession.sql(s"""
		MERGE INTO cdc_data t
		USING updates s
		ON s.key = t.key
		WHEN MATCHED THEN UPDATE SET 
			TS=CASE WHEN s.ts > t.ts THEN s.ts ELSE t.ts,
			A=CASE WHEN s.ts > t.ts THEN s.x ELSE t.a,
			B=CASE WHEN s.ts > t.ts THEN s.y ELSE t.y,
			
		WHEN NOT MATCHED THEN INSERT *
	""")
}

cdcData.writeStream
	.foreachBatch(upsertToDelta _)
	.outputMode("append")
	.start()

Handling Deletes

Managing deletions in a CDC system presents its own set of challenges. When a record is deleted from the source database, it must be accurately reflected in the target system. Failure to do so can result in discrepancies between datasets, leading to confusion and potential data integrity issues.

Including the changes to delete, the code looks like

def upsert_to_delta(microBatchDF: DataFrame, batchId: Long) {
	microBatchOutputDF
		.groupBy("key")
		.agg(max_by("timestamp", struct("*").alias("row") ))
		.select("row.*")
		.createOrReplaceTempView("updates")

	microBatchOutputDF.sparkSession.sql(s"""
		MERGE INTO cdc_data t
		USING updates s
		ON s.key = t.key
		WHEN MATCHED AND s.is_delete THEN DELETE
		WHEN MATCHED THEN UPDATE SET 
			TS=CASE WHEN s.ts > t.ts THEN s.ts ELSE t.ts,
			A=CASE WHEN s.ts > t.ts THEN s.x ELSE t.a,
			B=CASE WHEN s.ts > t.ts THEN s.y ELSE t.y,
			
		WHEN NOT MATCHED THEN INSERT *
	""")
}

cdcData.writeStream
	.foreachBatch(upsertToDelta _)
	.outputMode("append")
	.start()

Update Arrives Late After Delete

Another common issue arises when an update for a record arrives after that record has already been deleted from the source database. This late-arriving update can create ambiguity about whether to restore the deleted record or ignore the update entirely, complicating data reconciliation efforts.

To handle deletes in CDC, implement a soft delete approach by using tombstones. Instead of immediately removing records, update them with a deletion timestamp and create a view that filters out these tombstones for users. This method preserves delete information temporarily, allowing for proper synchronization across systems and enabling a separate clean-up process to remove tombstones over time.

Including the above changes, the code looks like

def upsert_to_delta(microBatchDF: DataFrame, batchId: Long) {
	microBatchOutputDF
		.groupBy("key")
		.agg(max_by("timestamp", struct("*").alias("row") ))
		.select("row.*")
		.createOrReplaceTempView("updates")

	microBatchOutputDF.sparkSession.sql(s"""
		MERGE INTO cdc_data t
		USING updates s
		ON s.key = t.key
		WHEN MATCHED AND s.is_delete THEN 
		UPDATE SET DELETED_AT = now()
		WHEN MATCHED THEN UPDATE SET 
			TS=CASE WHEN s.ts > t.ts THEN s.ts ELSE t.ts,
			A=CASE WHEN s.ts > t.ts THEN s.x ELSE t.a,
			B=CASE WHEN s.ts > t.ts THEN s.y ELSE t.y,
			
		WHEN NOT MATCHED THEN INSERT *
	""")
}

cdcData.writeStream
	.foreachBatch(upsertToDelta _)
	.outputMode("append")
	.start()

REDIRECT USERS TO A VIEW

CREATE VIEW cdc_data AS 
SELECT * FROM cdc_data-raw WHERE __DELETED_AT=NULL

RUN CLEAN UP PERIODICALLY

DELETE FROM cdc_data_raw WHERE __DELETED_AT < now() - INTERVAL 1 DAY 

The Importance of Implementing CDC

Despite these challenges, implementing a robust CDC system is essential for organizations that rely on accurate and timely data. The benefits include:

  • Real-Time Tracking: CDC allows organizations to track changes as they happen, ensuring that their data is always up-to-date.
  • Improved Inventory Management: For e-commerce businesses, having accurate inventory data is crucial for operational efficiency.
  • Enhanced Customer Experience: Timely updates enable businesses to provide better service and personalized experiences based on current data.
  • Data Integrity: Maintaining an accurate and consistent data pipeline is vital for business operations.

Conclusion

Implementing a Change Data Capture system comes with its share of challenges, but with careful planning and strategic solutions, organizations can successfully navigate these issues. By addressing problems like multiple updates, out-of-order changes, handling deletes, and late-arriving updates, businesses can harness the power of real-time data to drive decision-making and enhance operational efficiency. 

Thank you! Your submission has been received!
Download your copy
Oops! Something went wrong while submitting the form.