5

Load Data Into Redshift Using PySpark - DZone

 1 year ago
source link: https://dzone.com/articles/pyspark-data-pipeline-to-cleanse-transform-partiti
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

PySpark Data Pipeline To Cleanse, Transform, Partition, and Load Data Into Redshift Database Table

In this article, we will discuss how to create an optimized data pipeline using PySpark and load the data into a Redshift database table.

Mar. 16, 23 · Tutorial
Like (1)
595 Views

Data is the driving force behind many of today's businesses. With the ever-growing amounts of data available, businesses need to create optimized data pipelines that can handle large volumes of data in a reliable and efficient manner. In this article, we will discuss how to create an optimized data pipeline using PySpark and load the data into a Redshift database table. We will also cover data cleansing, transformation, partitioning, and data quality validation.

Before diving into the code, let's take a quick look at the tools we will be using:

  • PySpark: PySpark is a Python API for Apache Spark, an open-source distributed computing system. PySpark provides an interface for programming Spark with Python.
  • Redshift: Amazon Redshift is a fast, fully-managed, petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing Business Intelligence (BI) tools.

With these tools in mind, let's start by defining the problem we want to solve.

Problem Definition

Suppose we have a large dataset containing information about customers and their orders. We want to load this dataset into a Redshift table and perform the following tasks:

  1. Data Cleansing: Remove any records with missing or invalid data.
  2. Data Transformation: Transform the data into a format suitable for Redshift.
  3. Partitioning: Partition the data into smaller subsets to improve query performance.
  4. Data Quality Validation: Validate the data quality of the dataset before loading it into Redshift.
  5. Loading Data into Redshift: Load the cleaned and transformed dataset into a Redshift table.

Now that we have defined the problem let's start building our optimized data pipeline. Here is the complete code to create an optimized data pipeline with data cleansing, transformation, partitioning, and data quality validation using PySpark and loading into the Redshift database table: 

Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when 

# create a spark session
spark = SparkSession.builder.appName("DataPipeline").getOrCreate() 

# load the data
data = spark.read.csv("customer_orders.csv", header=True, inferSchema=True) 

# drop records with missing or invalid data
data = data.dropna() 

# rename columns
data = data.withColumnRenamed("Customer ID", "customer_id") \
    .withColumnRenamed("Order ID", "order_id") \
    .withColumnRenamed("Order Date", "order_date") \
    .withColumnRenamed("Order Amount", "order_amount") 

# convert data types
data = data.withColumn("customer_id", col("customer_id").cast("int")) \
    .withColumn("order_id", col("order_id").cast("int")) \
    .withColumn("order_amount", col("order_amount").cast("double")) 

# partition data
data = data.repartition("customer_id") 

# data quality validation
if data.count() == 0:    
    print("Error: No data to process.")    
    exit() 

# load data into Redshift
data.write \
    .format("jdbc") \
    .option("url", "jdbc:redshift://redshift-cluster-1.cvboublcdews.us-west-2.redshift.amazonaws.com:5439/dev") \
    .option("dbtable", "customer_orders") \
    .option("user", "username") \
    .option("password", "password") \
    .option("aws_iam_role", "arn:aws:iam::0123456789012:role/myRedshiftRole") \
    .option("tempdir", "s3a://my-s3-bucket/temp/") \
    .mode("overwrite") \
    .save() 

# stop the spark session
spark.stop()

Now, let's explain each step of the code.

Step 1: Set Up PySpark and Redshift

We start by importing the necessary libraries and setting up PySpark.

We also import the col and when functions from pyspark.sql.functions library. These functions will be used later in the data transformation step.

Step 2: Load the Data

The next step is to load the data into PySpark.

We load the data from a CSV file using the read.csv() method. We also specify that the file has a header row and infer the schema from the data.

Step 3: Data Cleansing

The next step is to cleanse the data by removing any records with missing or invalid data.

We use the dropna() method to drop any records with missing or invalid data.

Step 4: Data Transformation and Data Quality Validation

The next step is to transform the data into a format suitable for Redshift. We will be renaming the columns to conform to Redshift naming conventions and converting the data types to match the Redshift table schema. Data is validated using the <dataframe>.count() to ensure that the dataframe is not empty before initiating the write to the Redshift database.

Step 5: Write to Redshift Database

Finally, we use data.write to write the data from the PySpark DataFrame to Redshift. We specify the Redshift connection properties such as the URL, user, password, IAM role, and temporary S3 directory where the data is staged before being loaded into Redshift. We also specify the table name and set the options to truncate the table before writing the data (i.e., delete all existing data) and overwrite mode (i.e., replace the table if it already exists). Lastly, we use spark.stop() to stop the SparkSession.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK