27

Understanding the Spark insertInto function

 4 years ago
source link: https://www.tuicool.com/articles/6Z7bMrA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Mz6Jn2j.jpg!web

Photo by @marcusloke on Unsplash

Raw Data Ingestion into a Data Lake with spark is a common currently used ETL approach. In some cases, the raw data is cleaned, serialized and exposed as Hive tables used by the analytics team to perform SQL like operations. Thus, spark provides two options for tables creation: managed and external tables. The difference between these is that unlike the manage tables where spark controls the storage and the metadata, on an external table spark does not control the data location and only manages the metadata.

In addition, often a retry strategy to overwrite some failed partitions is needed. For instance, a batch job (timestamp partitioned) failed for the partition 22/10/2019 and we need to re-run the job writing the correct data. Therefore, there are two options: a) regenerate and overwrite all the data, or b) process and overwrite the data for the needed partition. Option two is discarded due to performance issues, imagine that you have to process an entire month of data.

Consequently, the option first option is used and fortunately spark has the option dynamic partitionOverwriteMode that overwrites data only for partitions present in the current batch. This option works perfectly while writing data to an external data store like HDFS or S3; cases, where is possible to reload the external table metadata by a simple, CREATE EXTERNAL TABLE command .

However, while working with managed tables the dynamic partition option has some behaviors that we need to understand in order to keep the data quality and consistency. First of all, even when spark provides two functions to store data in a table saveAsTable and insertInto, there is an important difference between them:

  • SaveAsTable: creates the table structure and stores the first version of the data. However, the overwrite save mode works over all the partitions even when dynamic is configured.
  • insertInto: does not create the table structure, however, the overwrite save mode works only the needed partitions when dynamic is configured.

So, SaveAsTable could be used to create the table from a raw dataframe definition and then after the table is created, overwrites are done using the insertInto function in a straightforward pattern. Nevertheless, the insertInto presents some not well-documented behaviors while writing the partitioned data and some challenges while working with data that contains schema changes.

Order of the Columns Problem

Let’s write a simple unit test where a table is created from a data frame.

it should "Store table and insert into new record on new partitions" in {
val spark = ss
import spark.implicits._
val targetTable = "companies_table"
val companiesDF = Seq(("A", "Company1"), ("B", "Company2")).toDF("id", "company")
companiesDF.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

val companiesHiveDF = ss.sql(s"SELECT * FROM ${targetTable}")

BFZ3m2V.png!web

So far, the table was created correctly. Then, let’s overwrite some data using insertInto and perform some asserts.

 val secondCompaniesDF = Seq(("C", "Company3"), ("D", "Company4"))
.toDF("id", "company")

secondCompaniesDF.write.mode(SaveMode.Append).insertInto(targetTable)
val companiesHiveAfterInsertDF = ss.sql(s"SELECT * FROM ${targetTable}")

companiesDF.count() should equal(2)
companiesHiveAfterInsertDF.count() should equal(4)
companiesHiveDF.select("id").collect().map(_.get(0)) should contain allOf("A", "B")
companiesHiveAfterInsertDF.select("id").collect() should contain allOf("A", "B", "C", "D")

}

This should work properly. However, look at the following data print:

fMJBjiF.png!web

As you can see the asserts failed due to the positions of the columns. There are two reasons: a) saveAsTable uses the partition column and adds it at the end. b) insertInto works using the order of the columns (exactly as calling an SQL insertInto) instead of the columns name. In consequence, adding the partition column at the end fixes the issue as shown here:

 //partition column should be at the end to match table schema.
val secondCompaniesDF = Seq(("Company3", "C"), ("Company4", "D"))
.toDF("company", "id")

secondCompaniesDF.write.mode(SaveMode.Append).insertInto(targetTable)
val companiesHiveAfterInsertDF = ss.sql(s"SELECT * FROM ${targetTable}") companiesHiveAfterInsertDF.printSchema()
companiesHiveAfterInsertDF.show(false)

companiesDF.count() should equal(2)
companiesHiveAfterInsertDF.count() should equal(4)
companiesHiveDF.select("id").collect().map(_.get(0)) should contain allOf("A", "B")
companiesHiveAfterInsertDF.select("id").collect().map(_.get(0)) should contain allOf("A", "B", "C", "D")

}

niIRN3i.png!web

Now the tests pass and the data is overwritten properly.

Matching the Table Schema

As described previously the order of the columns is important for the insertInto function. Besides, let’s image you are ingesting data that has a changing schema and you receive a new batch with a different number of columns.

New Batch With Extra Columns

Let’s test first the case when more columns are added.

//again adding the partition column at the end and trying to overwrite partition C.
val thirdCompaniesDF = Seq(("Company4", 10, "C"), ("Company5", 20, "F"))
.toDF("company", "size", "id")

thirdCompaniesDF.write.mode(SaveMode.Overwrite).insertInto(targetTable)

While trying to call insertInto the following error is shown:

Hence, a function that returns the missing columns in the table is needed:

def getMissingTableColumnsAgainstDataFrameSchema(df: DataFrame, tableDF: DataFrame): Set[String] = {
val dfSchema = df.schema.fields.map(v => (v.name, v.dataType)).toMap
val tableSchema = tableDF.schema.fields.map(v => (v.name, v.dataType)).toMap
val columnsMissingInTable = dfSchema.keys.toSet.diff(tableSchema.keys.toSet).map(x => x.concat(s" ${dfSchema.get(x).get.sql}"))

columnsMissingInTable
}

Then, the SQL ALTER TABLE command is executed. After this, the insertInto function works properly and the table schema is merged as you can see here:

val tableFlatDF = ss.sql(s"SELECT * FROM $targetTable limit 1")

val columnsMissingInTable = DataFrameSchemaUtils.getMissingTableColumnsAgainstDataFrameSchema(thirdCompaniesDF, tableFlatDF)

if (columnsMissingInTable.size > 0) {
ss.sql((s"ALTER TABLE $targetTable " +
s"ADD COLUMNS (${columnsMissingInTable.mkString(" , ")})"))
}

thirdCompaniesDF.write.mode(SaveMode.Overwrite).insertInto(targetTable)

val companiesHiveAfterInsertNewSchemaDF = ss.sql(s"SELECT * FROM $targetTable")

companiesHiveAfterInsertNewSchemaDF.printSchema()
companiesHiveAfterInsertNewSchemaDF.show(false)

VzyMBnI.png!web

New Batch With Fewer Columns

Let’s test now the case when fewer columns are received.

val fourthCompaniesDF = Seq("G", "H")
.toDF("id")

fourthCompaniesDF.write.mode(SaveMode.Overwrite).insertInto(targetTable)

The following error is shown:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK