Data and AI: Day 44 – Implementing SCD in Databricks

Listen to this Post

Where the Data is changing, it is how we process those changes which affects the preciseness and speed of our data warehouses. Whether you are working with historical tracking, real-time, or a mix of both, SCD (Slowly Changing Dimensions) will need to be mastered if you are a data engineer at any scale.

SCD Types Overview:

  • SCD Type 0 (Fixed Dimension): No null allowed. Once the data is captured, it has no other movement. Example: Birth date.
  • SCD Type 1 (Overwrite): Updates only the latest value with no data retention. Example: Correcting misspelled customer names.
  • SCD Type 2 (Add New Row): Maintaining history by creating a new row for every change and having the effective date or versioning. Example: Record Change of address.
  • SCD Type 4 (Add New Column): Means limited history, stores before as a column. Example: Keeping previous job title record.
  • SCD Type 6 (Combined: Type 1 & Type 2/3): Mixes of historical tracking, previous value, and current. Example: Holding history of department changes in a separate table alongside anything.

For Databricks and Delta Lake folks, this is the Ultimate Guide to uplevel your SCD. Download and start writing better data pipelines today.

You Should Know:

Here are some practical commands and code snippets to implement SCD in Databricks:

1. Creating a Delta Table:

CREATE TABLE IF NOT EXISTS customer_data (
customer_id INT,
customer_name STRING,
address STRING,
effective_date DATE,
end_date DATE,
current_flag BOOLEAN
) USING DELTA;

2. SCD Type 1 Implementation (Overwrite):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("SCD Type 1").getOrCreate()

<h1>Load new data</h1>

new_data = spark.read.format("delta").load("/path/to/new_data")

<h1>Load existing data</h1>

existing_data = spark.read.format("delta").load("/path/to/existing_data")

<h1>Perform SCD Type 1</h1>

updated_data = existing_data.join(new_data, "customer_id", "outer") \
.select(
col("customer_id"),
col("new_data.customer_name").alias("customer_name"),
col("new_data.address").alias("address"),
col("effective_date"),
col("end_date"),
col("current_flag")
)

updated_data.write.format("delta").mode("overwrite").save("/path/to/existing_data")

3. SCD Type 2 Implementation (Add New Row):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

spark = SparkSession.builder.appName("SCD Type 2").getOrCreate()

<h1>Load new data</h1>

new_data = spark.read.format("delta").load("/path/to/new_data")

<h1>Load existing data</h1>

existing_data = spark.read.format("delta").load("/path/to/existing_data")

<h1>Perform SCD Type 2</h1>

updated_data = existing_data.join(new_data, "customer_id", "outer") \
.select(
col("customer_id"),
when(col("new_data.customer_name").isNotNull(), col("new_data.customer_name")).otherwise(col("existing_data.customer_name")).alias("customer_name"),
when(col("new_data.address").isNotNull(), col("new_data.address")).otherwise(col("existing_data.address")).alias("address"),
when(col("new_data.effective_date").isNotNull(), col("new_data.effective_date")).otherwise(col("existing_data.effective_date")).alias("effective_date"),
when(col("new_data.end_date").isNotNull(), col("new_data.end_date")).otherwise(col("existing_data.end_date")).alias("end_date"),
when(col("new_data.current_flag").isNotNull(), col("new_data.current_flag")).otherwise(col("existing_data.current_flag")).alias("current_flag")
)

updated_data.write.format("delta").mode("append").save("/path/to/existing_data")

4. SCD Type 6 Implementation (Combined):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

spark = SparkSession.builder.appName("SCD Type 6").getOrCreate()

<h1>Load new data</h1>

new_data = spark.read.format("delta").load("/path/to/new_data")

<h1>Load existing data</h1>

existing_data = spark.read.format("delta").load("/path/to/existing_data")

<h1>Perform SCD Type 6</h1>

updated_data = existing_data.join(new_data, "customer_id", "outer") \
.select(
col("customer_id"),
when(col("new_data.customer_name").isNotNull(), col("new_data.customer_name")).otherwise(col("existing_data.customer_name")).alias("customer_name"),
when(col("new_data.address").isNotNull(), col("new_data.address")).otherwise(col("existing_data.address")).alias("address"),
when(col("new_data.effective_date").isNotNull(), col("new_data.effective_date")).otherwise(col("existing_data.effective_date")).alias("effective_date"),
when(col("new_data.end_date").isNotNull(), col("new_data.end_date")).otherwise(col("existing_data.end_date")).alias("end_date"),
when(col("new_data.current_flag").isNotNull(), col("new_data.current_flag")).otherwise(col("existing_data.current_flag")).alias("current_flag")
)

updated_data.write.format("delta").mode("overwrite").save("/path/to/existing_data")

What Undercode Say:

Mastering SCD in Databricks is crucial for maintaining accurate and efficient data pipelines. By understanding and implementing SCD types, you can ensure that your data warehouse remains precise and up-to-date. The provided commands and code snippets should help you get started with SCD in Databricks. For further reading, consider exploring the official Databricks documentation on Delta Lake and SCD implementations.

Additional Resources:

References:

Reported By: Shwetank Singh – Hackers Feeds
Extra Hub: Undercode MoN
Basic Verification: Pass ✅

Join Our Cyber World:

Whatsapp
TelegramFeatured Image