Apache Spark – RDD, DataFrames, Transformations (Narrow & Wide), Actions, La...
source link: https://sqlwithmanoj.com/2020/10/23/apache-spark-rdd-dataframes-transformations-narrow-wide-actions-lazy-evaluation-part-3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Apache Spark – RDD, DataFrames, Transformations (Narrow & Wide), Actions, Lazy Evaluation (Part 3)
image credits: DatabricksRDD (Resilient Distributed Dataset)
Spark works on the concept of RDDs i.e. “Resilient Distributed Dataset”. It is an Immutable, Fault Tolerant collection of objects partitioned across several nodes. With the concept of lineage RDDs can rebuild a lost partition in case of any node failure.
– In Spark initial versions RDDs was the only way for users to interact with Spark with its low-level API that provides various Transformations and Actions.
– With Spark 2.x new DataFrames and DataSets were introduced which are also built on top of RDDs, but provide more high-level structured APIs and more benefits over RDDs.
– But at the Spark core ultimately all Spark computation operations and high-level DataFrames APIs are converted into low-level RDD based Scala bytecode, which are executed in Spark Executors.
–> RDDs can be created in various ways, like:
1. Reading a file from local file system:
val
FileRDD
=
spark.read.textFile(
"/mnt/test/hello.txt"
)
FileRDD.collect()
2. from a local collection by using parallelize method”
val
myCol
=
"My Name is Manoj Pandey"
.split(
" "
)
val
myRDD
=
spark.sparkContext.parallelize(myCol,
2
)
myRDD.collect().foreach(println)
3. Transforming an existing RDD:
val
myCol
=
"My Name is Manoj Pandey. I stay in Hyderabad"
.split(
" "
)
val
myRDD
=
spark.sparkContext.parallelize(myCol,
2
)
val
myRDD
2
=
myRDD.filter(s
=
> s.contains(
"a"
))
myRDD
2
.collect().foreach(println)
–> One should only use RDDs if working with raw and unstructured data, or don’t worry about schema and optimization & performance benefits available with DataFrames.
DataFrames
Just like RDDs, DataFrames are also Immutable collection of objects distributed/partitioned across several nodes. But unlike RDD, a DataFrame is like a table in RDBMS organized into columns and rows, columns with specific schema and datatypes like integer, date, string, timestamp, etc.
– DataFrames also provides optimization & performance benefits with the help of Catalyst Optimizer.
– As mentioned above the Spark Catalyst Optimizer always converts a DataFrame to low-level RDD transformations.
1. A simple example to create a DataFrame by reading a CSV file:
val
myDF
=
spark
.read
.option(
"inferSchema"
,
"true"
)
.option(
"header"
,
"true"
)
.csv(
""
"/dbfs/csv/hello.csv"
""
)
display(myDF)
2. Creating a DataFrame by using Seq collection and using toDF() method:
val
myDF
=
Seq((
"Male"
,
"Brock"
,
30
),
(
"Male"
,
"John"
,
31
),
(
"Male"
,
"Andy"
,
35
),
(
"Female"
,
"Jane"
,
25
),
(
"Female"
,
"Maria"
,
30
)).toDF(
"gender"
,
"name"
,
"age"
)
display(myDF)
3. Creating a new DataFrame from an existing DataFrame by using groupBy() method over it:
import
org.apache.spark.sql.functions.
_
val
myDF
=
Seq((
"Male"
,
"Brock"
,
30
),
(
"Male"
,
"John"
,
31
),
(
"Male"
,
"Andy"
,
35
),
(
"Female"
,
"Jane"
,
25
),
(
"Female"
,
"Maria"
,
30
)).toDF(
"gender"
,
"name"
,
"age"
)
val
DFavg
=
myDF.groupBy(
"gender"
).agg(avg(
"age"
))
display(DFavg)
4. Creating a DataFrame from an existing RDD:
val
myCol
=
"My Name is Manoj Pandey"
.split(
" "
)
val
myRDD
=
spark.sparkContext.parallelize(myCol,
2
)
val
myDF
=
myRDD.toDF()
display(myDF)
5. DataFrames can also be converted to Tables or Views (temp-Tables) so that you can use Spark SQL queries instead of applying Scala Transformations:
myDF.createOrReplaceTempView(
"tblPerson"
)
display(spark.sql(
""
"
select gender, AVG(age) as AVGAge
from tblPerson
group by gender
"
""
))
Transformations
In Spark RDDs and DataFrames are immutable, so to perform several operations on the data present in a DataFrame, it is transformed to a new DataFrame without modifying the existing DataFrame.
–> There are two types of Transformations:
1. Narrow Transformations: applies on a single partition, for example: filter(), map(), contains() can operate in single partition and no data exchange happens here between partitions.
2. Wide Transformations: applies on a multiple partitions, for example: groupBy(), reduceBy(), orderBy() requires to read other partitions and exchange data between partitions which is called shuffle and Spark has to write data to disk.
Lazy Evaluation
Both the above Narrow & Wide Transformation types are lazy in nature, means that until and unless any action is performed over these transformations the execution of all these transformations is delayed and Lazily evaluated. Due to this delay the Spark execution engine gets a whole view of all the chained transformations and ample time to optimize your query.
Actions
As Transformations don’t execute anything on their own, so to execute the chain of Transformations Spark needs some Actions to perform and triggers the Transformations.
Some examples of Actions are: count(), collect(), show(), save(), etc. to perform different operations like: to collect data of objects, show calculated data in a console, and write data to a file or target data sources.
Related
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK