7

Time Travel: Data versioning in Delta Lake

 2 years ago
source link: https://blog.knoldus.com/time-travel-data-versioning-in-delta-lake/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Time Travel: Data versioning in Delta Lake

Reading Time: 3 minutes

In today’s Big Data world, we process large amounts of data continuously and store the resulting data into data lake. This keeps changing the state of the data lake. But, sometimes we would like to access a historical version of our data. This requires versioning of data. Such kinds of data management simplifies our data pipeline by making it easy for professionals or organizations to audit the data changes, roll back to the previous version in case of accidental bad writes or deletes and so on. Apache Spark, alone can’t provide these kinds of capabilities but with the help of Databricks Delta, the next-gen unified analytics engine built on top of Apache Spark, introduces such unique Time Travel capabilities.

Time Traveling using Delta Lake

When we write our data into a Delta table, every operation is automatically versioned and we can access any version of data. This allows us to travel back to a different version of the current delta table.

This time-traveling can be achieved using 2 approaches:
1. Using a version number
2. Using a timestamp
Let’s understand these approaches with an example:

//Job-1 writing data into delta lake
        Dataset<Long> data = sparkSession.range(100, 200);
        data.write().mode("overwrite").format("delta").save(FILE_PATH);
        LOGGER.info("records created after job-1: " + sparkSession.read().format("delta").load(FILE_PATH).count());
        Dataset<Row> job1DeltaTable = sparkSession.read()
               .format("delta").load(FILE_PATH);
        job1DeltaTable.show();

In the above code snippet, Job-1 writes a record of 100 integers from 100 to 200 into the delta table. Similarly, Job-2 updates the record created by Job-1 with 100 integers from 1 to 100.

//Job-2 updating record created by Job-1
        sparkSession.range(100).write().mode("overwrite")
                .format("delta").option("overwriteSchema", "true").save(FILE_PATH);
        LOGGER.info("records created after job-2: " + sparkSession.read().format("delta").load(FILE_PATH).count());
        Dataset<Row> job2DeltaTable = sparkSession.read()
               .format("delta").load(FILE_PATH);
        job2DeltaTable.show();

Now, let’s find the latest history of the delta table.

DeltaTable deltaTable = DeltaTable.forPath(FILE_PATH);
Dataset<Row> latestHistory = deltaTable.history();     latestHistory.select("version","timestamp","operation","operationParameters").show(false);

As seen above, both operations have unique version numbers and time stamps. This helps to audit the data changes in the data lake.

1. Using a version number

//Read first version of table using version number.
Dataset<Row> version_0_DeltaLakeTable = sparkSession.read().option("versionAsOf", NumberUtils.INTEGER_ZERO)
               .format("delta").load(FILE_PATH);
        version_0_DeltaLakeTable.show();

In the above code snippet, read() reads version 0 of the current delta table. In the same manner, we can pass our required version number to read(). Thus, using the version number of the operation, we can read the required version of the data table.

2. Using a timestamp

First, find the timestamp of Job-1

//find timestamp for first version of delta table
        Dataset<Row> load = sparkSession.read().format("json").option("multiline", true).load(FIRST_COMMIT_FILE_PATH);
        Dataset<Row> timestampDataSet = load.select("commitInfo.timestamp");
        String timestamp = new Timestamp(timestampDataSet.first().getLong(0)).toString(code );

Second, using the timestamp read the historical version of the delta table

//Read first version of table using its time Stamp.
Dataset<Row> version_0_DeltaLakeTable = sparkSession.read().option("timestampAsOf", timeStamp)
               .format("delta").load(FILE_PATH);
        version_0_DeltaLakeTable.show();

Time Travel Use Cases

Delta Lake time travel allows us to query an older snapshot of a Delta Lake table. Time travel has many use cases, including:

  • Time travel makes it easy to do rollbacks in case of bad writes, playing an important role in fixing mistakes in our data.
  • It helps in re-creating analysis, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
  • It also simplifies time-series analytics. For instance, in finding out how many new customers were added over the last week.

The source code can be found here

Thanks for reading!!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK