Quick Guide on using Databricks Delta Lake using Python API

Introduction: With the modern Datawarehouse architecture, we ensure the existing Datawarehouse functionalities and data lake features with addons such as Time travel(travel across different snapshots of data), Schema Evolution (adapt fast changing structures), Governance policies (RBAC, row level policy).

Databricks platform is integrated with delta lake, providing a seamless experience for the developers to work with wide variety(format of data), volume (data storage) & velocity (engine processing), Its compatibility with Apache Spark allows users to run their existing Spark jobs on Delta Lake with minimal changes, leveraging Spark’s powerful analytics capabilities on a more reliable and robust data storage foundation.

Delta Lake is a technology developed by the same developers as Apache Spark. It is an open-source storage layer created to run on top of an existing data lake and improve its reliability, security, and performance. It’s designed to bring reliability to your data lakes and provide Atomicity, Consistency, Isolation, and Durability (ACID) transactions, scalable metadata handling, and unifying streaming and batch data processing.

Significance of Delta Lake :

Unified Batch and streaming layer for storing data: We can do simultaneous streaming or batch writes to the table, so we do not need to maintain separate architectures for streaming and batch. Even Spark is extensively used for batch and streaming loads.

Data Governance: Implements RBAC (role-based access controls), using a feature called Unity catalog(can be integrated with Azure, AWS, & gcp storage layers), includes row & column level based policies for masking, enhanced data lineage including business & technical metadata.

Time Travel (Data Versioning): We can query an older snapshot of our data, provide data versioning, and roll back or audit data. Delta Lake allows users to access and analyze previous versions of data through time travel capabilities. This feature enables data exploration and analysis at different points in time.

DML Operations: Though we have DML operation in Data lakes, but has lot of limitations (ex: In hive we can do DML operation only if the tables are defined in ORC file format and dbtxnmanger should be enabled). Delta allows us to do upsets or merges very easily.

Optimized File Management: it uses Parquet for data storage and json for metadata. Delta Lake organizes data into optimized Parquet files and maintains metadata to enable efficient file management. It leverages file-level operations like compaction, partitioning, and indexing to optimize query performance and reduce storage costs.

In this article, we see how to create delta tables on databricks and various API associated with delta tables, and how many ways to access the delta tables

How the delta table stores the underlying data:

Article content

In our article, we willprovide some examples related delta table built over Azure Data Bricks & run time environment is:14.3 LTS ML( Spark 3.5, Scala 2.12)

Different ways to create a Delta table [using Python API] :


  1. Create a delta file:


#read covid19 patient data
patientData = spark.read.format('csv')\
    .option('header', True)\
    .load('/databricks-datasets/COVID/coronavirusdataset/PatientInfo.csv')

#write as delta file
patientData.write.format('delta')\
    .mode('overwrite')\
    .save('abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/covid19_patients.parquet')        
Article content

2. Create a delta Table using Spark write API: For creating a delta table, we need a catalog & schema (if it is a catalog hierarchy) or we need Hive metastore (delta table creation under Hive metastore)

In the previous example, we didn't use partitionBy, now the table is created with a partition usingthe below example

create catalog if not exists learning_datamart;
-- use catalog learning_datamart;
create schema if not exists learning_datamart.staging;         
#read covid19 patient data
patientData = spark.read.format('csv')\
    .option('header', True)\
    .load('/databricks-datasets/COVID/coronavirusdataset/PatientInfo.csv')

#write as delta table
patientData.write.format('delta')\
    .mode('overwrite')\
    .partitionBy('country')\
    .saveAsTable('learning_datamart.staging.tbl_covid19_patient')        
Article content
Article content

3. Create a delta table using the delta API: use the create or createOrReplace API to create tables, Initially we need to Delta module and use DeltaTable class to use the below API’s to create a table

from delta import *
from pyspark.sql.types import *
  
createTbl = DeltaTable.createOrReplace(spark)\
           .tableName('learning_datamart.staging.covid19_patient_data')\
           .addColumns(
               StructType(
                   [
                       StructField(name='patient_id', dataType=StringType()),
                       StructField(name='sex', dataType=StringType()),
                       StructField(name='age', dataType=StringType()),
                       StructField(name='country', dataType=StringType()),
                       StructField(name='province', dataType=StringType())
                   ]
               )         
           )\
           .partitionedBy('country')
           
createTbl.execute()         
#read covid19 patient data
patientData = spark.read.format('csv')\
    .option('header', True)\
    .load('/databricks-datasets/COVID/coronavirusdataset/PatientInfo.csv')

patientData = patientData.select( 'patient_id', 'sex', 'age', 'province', 'country')

#write as delta table
patientData.write.format('delta')\
    .mode('append')\
    .insertInto('learning_datamart.staging.covid19_patient_data')
    #partitionBy()--> we cannot use this, because partition created while creating table        
Article content
Article content

4. Time Travel: Delta Lake stores data in Parquet files and information about transactions in the deltalog metadata folder.

We created a table and replaced the table & and loaded data over 3 snapshots, Now we use Python API to retrieve specific snapshots

Article content
#reading delta table
patientData = spark.read.table('learning_datamart.staging.covid19_patient_data@v2')\
.display()

#reading delta file 
spark.read.format('delta')\
        .option('versionAsOf', "0")\
        .load('abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/covid19_patients.parquet').display()        

4. Vacuum: removes all files from the table directory that are not managed by Delta, as well as data files that are no longer in the latest state of the transaction log for the table and are older than a retention threshold

The default threshold is 7 days; in any case, it will retain the current version of the dataset

Disable env: spark.databricks.delta.retentionDurationCheck.enabled = False: to delete snapshots which are lesser than the default threshold(7 days)

from delta import *
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', False)

tablePatientData = DeltaTable.forName(spark, "learning_datamart.staging.covid19_patient_data") # use forName for using any python api over delta table

filePatientData = DeltaTable.forPath(spark, "abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/covid19_patients.parquet") # use forPath for using any python api over delta file


tablePatientData.vacuum(2) # retain 2 hours 

filePatientData.vacuum() # retain only 7 days of data         
Article content

5.Detail: We can retrieve detailed information about a Delta table (for example, number of files, data size) using detail().

from delta import *
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', False)

tablePatientData = DeltaTable.forName(spark, "learning_datamart.staging.covid19_patient_data") # use forName for using any python api over delta table
filePatientData = DeltaTable.forPath(spark, "abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/covid19_patients.parquet") # use forPath for using any python api over delta file

tablePatientData.detail() 
filePatientData.detail()         
Article content

6. History : Each operation that modifies a Delta Lake table creates a new table version. You can use history information to audit operations, rollback a table, or query a table at a specific point in time using time travel.

from delta import *
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', False)

tablePatientData = DeltaTable.forName(spark, "learning_datamart.staging.covid19_patient_data") # use forName for using any python api over delta table
filePatientData = DeltaTable.forPath(spark, "abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/covid19_patients.parquet") # use forPath for using any python api over delta file

tablePatientData.history(2).display() # shows only latest 2 history
filePatientData.history().display() # shows all history        
Article content

7. Alias : create alias for delta objects

tablePatientData.alias('covdata').merge()
filePatientData.alias('covfile').update()        

8.toDf : convert delta object to dataframe

covData = tablePatientData.toDF()
covFile = filePatientData.toDF()
type(covData)
type(covFile)        
Article content

9. Delete : Deletes the rows that match a predicate. When no predicate is provided, deletes all rows.

from delta import *
from pyspark.sql.functions import  *
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', False)

tablePatientData = DeltaTable.forName(spark, "learning_datamart.staging.covid19_patient_data") # use forName for using any python api over delta table
filePatientData = DeltaTable.forPath(spark, "abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/covid19_patients.parquet") # use forPath for using any python api over delta file

tablePatientData.update( set=  {'sex': lit('male'), 'country': lit('Singapore')}, condition="patient_id = '1100000003'") 
tablePatientData.update( set = {'sex': lit('male')},   condition= col('patient_id') == lit('1000000004'))         
Article content

10. Update : Updates the column values for the rows that match a predicate. When no predicate is provided, update the column values for all rows.

from delta import *
from pyspark.sql.functions import  *
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', False)

tablePatientData = DeltaTable.forName(spark, "learning_datamart.staging.covid19_patient_data") # use forName for using any python api over delta table
filePatientData = DeltaTable.forPath(spark, "abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/covid19_patients.parquet") # use forPath for using any python api over delta file

tablePatientData.update( set=  {'sex': lit('male'), 'country': lit('Singapore')}, condition="patient_id = '1100000003'") 
tablePatientData.update( set = {'sex': lit('male')},   condition= col('patient_id') == lit('1000000004'))         
Article content
Article content

11.Merge : wecan upsert data from a source table, view, or DataFrame into a target Delta table by using the merge api. Delta Lake supports inserts, updates, and deletes using Merge, and it supports extended syntax beyond the SQL standards to facilitate advanced use cases.

# adding description
from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *

spk = SparkSession.builder.appName('loadCustomer').getOrCreate()
cusData = DeltaTable.forName(spk, 'development.institution.customer_data')

cusDF = cusData.toDF()
cusDF = cusDF.filter(cusDF.rec_flag == lit('A'))

incDf = spk.readStream.format('cloudFiles')\
     .option('cloudFiles.format', 'csv')\
     .option("header", True)\
     .option("delimiter", ',')\
     .option("cloudFiles.schemaLocation", 'abfss://schemalocation@mncdatalakehouse.dfs.core.windows.net/customerData/')\
     .load('abfss://landing-zone@exteventsdatalake.dfs.core.windows.net/SCD/')\
     
incDf =  incDf.withColumns(
     {'ref_key' : lit(-1),
      'pk_key' : md5(incDf.empId),
      'row_key' : md5(concat(incDf.empName,incDf.emailId,incDf.PhoneNo,incDf.address,incDf.city,incDf.province,incDf.postalcode)),
      'eff_start_dt' : current_date(),
      'eff_end_dt' : lit('9999-12-31'),
      'rec_flag' : lit('A')})\
     .drop(incDf._rescued_data)

insertDF = incDf.join(cusDF, cusDF.pk_key == incDf.pk_key, 'left_anti')
intersectDF = incDf.join(cusDF, cusDF.pk_key == incDf.pk_key).select(incDf['*'])

updateDF = incDf.join(cusDF, cusDF.pk_key == incDf.pk_key).select(incDf['*'], cusDF.pk_key.alias('cus_pk_key'))\
     .drop(incDf.ref_key)\
     .withColumnRenamed('cus_pk_key', 'ref_key')

inputDF = insertDF.unionByName(updateDF).unionByName(intersectDF)

def implementSCD2(inputDf, batchId):
     cusData.alias('td').merge(inputDf.alias('id'), "td.pk_key = id.ref_key")\
          .whenNotMatchedInsertAll()\
          .whenMatchedUpdate(condition=("id.row_key != td.row_key and td.rec_flag = 'A'"), 
                        set={
                             'rec_flag' : lit('I'),
                             'eff_end_dt' : current_date()-1})\
          .execute()  

query = inputDF.writeStream.format("delta")\
               .foreachBatch(implementSCD2)\
               .option('checkpointLocation', 'abfss://checkpoints@mncdatalakehouse.dfs.core.windows.net/customerData/')\
               .queryName('loadCustomerSCD2')\
               .start()\
             
query.awaitTermination()        

12. is DeltaTable() & convertToDelta() : The python API convertToDelta() performs a one-time conversion for Parquet and Iceberg tables to Delta Lake tables.

from delta import *
from pyspark.sql.functions import  *

df = spark.read.format('csv')\
    .option('multiline', True)\
    .option("quote", "'")\
    .load('abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/data.csv.txt')

df.write.format("parquet")\
    .option('compression','snappy')\
    .mode('overwrite')\
    .save("abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/data.parquet")

status = DeltaTable.isDeltaTable(spark,'abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/data.parquet')
print(status)

DeltaTable.convertToDelta(spark,'parquet. `abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/data.parquet`' )

status = DeltaTable.isDeltaTable(spark,'abfss://apac-nas-location@dlakeapacregion.dfs.core.windows.net/deltafiles/data.parquet')
print(status)        
Article content

More to key in → Optimize()

Sivaram Yaramasu

Databricks Certified Data Engineer Professional | Data Science Enthusiast | LLM | AIML | PySpark | SQL | Data Engineer| Microsoft Azure| Microsoft Fabric| Databricks

3mo

💡 Great insight anna

Like
Reply

To view or add a comment, sign in

Others also viewed

Explore topics