39

RDD using Spark : The Building Block of Apache Spark

 5 years ago
source link: https://www.tuicool.com/articles/hit/M7Nvair
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Spark , The word itself is enough to generate a spark in every Hadoop engineer’s mind. A n in-memory processing tool which is lightning-fast in cluster computing. Compared to MapReduce, the in-memory data sharing makes RDDs  10-100x  faster  than network and disk sharing and all this is possible because of RDDs (Resilient Distributed Data sets). The key points we focus today in this RDD using Spark article are:

  • Creation of RDDs using Spark
  • Operations performed on RDDs
  • RDDs using Spark: Pokemon Use Case

Need for RDDs?

jmQjI3j.jpg!web

The world is evolving with  Artificial intelligence and  Data science because of the advancement in Machine learning. Algorithms   based on   Regression , C lustering, and C lassification which runs on  Distributed   Iterative Comput ation fashion that includes Reusing and Sharing of data among multiple computing units.

The traditional MapReduce techniques needed a Stable Intermediate and Distributed storage like HDFS comprising repetitive computations with data replications and data serialization, which made the process a lot slower. Finding a solution was never easy.

B3ueuyq.png!web

This is where  RDDs (Resilient Distributed Datasets) comes to the big picture.

RDD s are easy to use and effortless to create as data is imported from data sources and dropped into RDDs. Further, the operations are applied to process them. They are a  distributed collection of memory   with permissions as Read-only  and most importantly, they are  Fault-tolerant .

me6r6nN.png!web

If any  data partition of an RDD is lost , it can be regenerated by applying the same transformation  operation on that lost partition in  lineage , rather than processing all the data from scratch. This kind of approach in real time scenarios can make miracles happen in situations of data loss or when a system is down.

What are RDDs?

RDD or ( Resilient Distributed Data set ) is a fundamental data structure in Spark. The term Resilient defines the ability that generates the data automatically or data rolling back to the original state when an unexpected calamity occurs with a probability of data loss.

RFruEz2.png!web

The data written into RDDs is partitioned and stored into  multiple executable nodes . If an executing node  fails in the run time, then it instantly gets the back up from the next executable node . This is why RDDs are considered as an advanced type of data structures when compared to other traditional data structures. RDDs can store structured, unstructured and semi-structured data.

RRb2yq7.png!web

Let’s move ahead with our RDD using Spark blog and learn about the unique features of RDDs which gives it an edge over other types of data structures.

Features of RDD

YZrYbiM.png!web

  • In-Memory (RAM)  Computations : The concept of In-Memory computation takes the data processing to a faster and efficient stage where the overall  performance  of the system is upgraded.  
  • L azy Evaluation : The term Lazy evaluation says the transformations are applied to the data in RDD, but the output is not generated. Instead, the applied transformations are  logged.
  • Persistence : The resultant RDDs are always  reusable.
  • Coarse-Grained Operations : The user can apply transformations to all elements in data sets through map,   filter  or  group by operations.
  • Fault Tolerant : If there is a loss of data, the system can  roll back  to its original state  by using the logged  transformations .
  • Immutability : Data defined, retrieved or created cannot be  changed  once it is logged into the system. In case if you need to access and modify the existing RDD, you must create a new RDD by applying a set of Transformation  functions on to the current or preceding RDD .
  • Partitioning : It is the crucial unit  of parallelism in Spark RDD. By default, the number of partitions created is based on your data source.   You can even decide the number of partitions you wish to make using custom partition functions.

Creation of RDD using Spark

RDDs can be created in three ways:

ryUbayU.png!web

  1. Reading data from parallelized collections
val PCRDD = spark.sparkContext.parallelize(Array("Mon","Tue","Wed","Thu","Fri","Sat"),2)
val resultRDD = PCRDD.collect()
resultRDD.collect().foreach(println)
  1. Applying transformation on previous RDDs
val words = spark.sparkContext.parallelize(Seq("Spark","is","a","very","powerful","language"))
val wordpair = words.map(w =(w.charAt(0),w))
wordpair.collect().foreach(println)
  1. Reading data from external storage or file paths like HDFS or HBase
val Sparkfile = spark.read.textFile("/user/edureka_566977/spark/spark.txt.")
Sparkfile.collect()

Apache Spark and Scala Certification Training

Operations performed on RDDs:

There are mainly two types of operations which are performed on RDDs, namely:

  • Transformations 
  • Actions

3y6Jbmj.png!web

Transformations : The operations we apply on RDDs to filter, access and  modify the data in parent RDD to generate a successive RDD  is called  transformation . The new RDD returns a pointer to the previous RDD ensuring the dependency between them.

Transformations are  Lazy Evaluations, in other words, the operations applied on the RDD that you are working will be logged but not executed. The system throws a result or exception after triggering the  Action .

We can divide transformations into two types as below:

  • Narrow Transformations
  • Wide Transformations

Narrow TransformationsWe apply narrow transformations on to a single partition of the parent RDD to generate a new RDD as data required to process the RDD is available on a single partition of the parent RDD . The examples for narrow transformations are:

  • map()
  • filter()
  • flatMap()
  • partition()
  • mapPartitions()

Wide Transformations:We apply the wide transformation on  multiple partitions to generate a new RDD. The data required to process the RDD is available on the multiple partitions of the parent RDD . The examples for wide transformations are :

  • reduceBy()
  • union()

Actions : Actions instruct Apache Spark to apply computation and pass the result or an exception back to the driver RDD. Few of the actions include:

  • collect()
  • count()
  • take()
  • first()

Let us practically apply the operations on RDDs:

QFzAzeU.png!web

IPL(Indian Premier League)is a cricket tournament with it’s hipe at a peak level. So, lets today get our hands on to the IPL data set and execute our RDD using Spark.

  • Firstly,  let’s download a CSV match data of IPL. After downloading it, it starts to look as an EXCEL file with rows and columns.

2Uzuem6.png!web

In the next step, we fire up the spark and load the matches.csv file from its location, in my case my csv file location is “/user/edureka_566977/test/matches.csv”

VRRJnyN.png!web

Now let us Start with the Transformation part first:

QrUbaiN.png!web

  • map():

We use Map Transformation to apply a specific transformation operation on every element of an RDD. Here we create an RDD by name CKfile where store our csv file. We shall create another RDD called States to store the city details .

spark2-shell
val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")
CKfile.collect.foreach(println)
val states = CKfile.map(_.split(",")(2))
states.collect().foreach(println)

iuy2qmI.png!web

  • filter(): 

Filter transformation, the name itself describes its use. We use this transformation operation to filter out the selective data out of a collection of data given. We apply filter operation here to get the records of the IPL matches of the year 2017 and store it in fil RDD.

val fil = CKfile.filter(line => line.contains("2017"))
fil.collect().foreach(println)

iuEzmmy.png!web

  • flatMap():

We apply flatMap is a transformation operation to each of the elements of an RDD to create a newRDD. It is similar to Map transformation. here we apply  Flatmap to spit out the matches of Hyderabad city and store the data into filRDD RDD.

val filRDD = fil.flatMap(line => line.split("Hyderabad")).collect()

fIzyeqb.png!web

  • partition(): 

Every data we write into an RDD is split into a certain number of partitions. We use this transformation to find the number of partitions the data is actually split into.

val fil = CKfile.filter(line => line.contains("2017"))
fil.partitions.size

Mj2IJvZ.png!web

  • mapPartitions(): 

We consider MapPatitions as an alternative of Map() and foreach () together. We use mapPartitions here to find the number of rows we have in our fil RDD.

val fil = CKfile.filter(line => line.contains("2016"))
fil.mapPartitions(idx => Array(idx.size).iterator).collect

ZfQvayR.png!web

  • reduceBy(): 

We use ReduceBy () on Key-Value pairs . We used this transformation on our csv file to find the player with the highest Man of the matches .

val ManOfTheMatch = CKfile.map(_.split(",")(13))
val MOTMcount = ManOfTheMatch.map(WINcount => (WINcount,1))
val ManOTH = MOTMcount.reduceByKey((x,y)=> x+y).map(tup => (tup._2,tup._1))sortByKey(false)
ManOTH.take(10).foreach(println)

jyi6bqr.png!web

  • union():

The name explains it all, We use union transformation is to club two RDDs together . Here we are creating two RDDs namely fil and fil2. fil RDD contains the records of 2017 IPL matches and fil2 RDD contains 2016 IPL match record.

val fil = CKfile.filter(line => line.contains("2017"))
val fil2 = CKfile.filter(line => line.contains("2016"))
val uninRDD = fil.union(fil2)

3aUvYbi.png!web

Let us start with the Action part where we show actual output:

UFzYnqV.png!web

  • collect():

Collect is the action which we use to display the contents in an RDD.

val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")
CKfile.collect.foreach(println)

Ynm2ymn.png!web

  • count():

Count is an action that we use to count the number of records present in an RDD .Here we are using this operation to count the total number of records in our matches.csv file.

<span>val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")</span>
<span>CKfile.count()</span>

MBruuqN.png!web

  • take():

Take is an Action operation similar to collect but the only difference is it can print any selective number of rows as per user request. Here we apply the following code to print the top ten leading reports.

val statecountm = Scount.reduceByKey((x,y)=> x+y).map(tup => (tup._2,tup._1))sortByKey(false)
statecountm.collect().foreach(println)
statecountm.take(10).foreach(println)

MJbqeiJ.png!web

  • first(): 

First() is an action operation similar to collect() and take() it used to print the topmost report s the output Here we use the first() operation to find the maximum number of matches played in a particular city and we get Mumbai as the output.

val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")
val states = CKfile.map(_.split(",")(2))
val Scount = states.map(Scount => (Scount,1))
scala> val statecount = Scount.reduceByKey((x,y)=> x+y).collect.foreach(println)
Scount.reduceByKey((x,y)=> x+y).collect.foreach(println)
val statecountm = Scount.reduceByKey((x,y)=> x+y).map(tup => (tup._2,tup._1))sortByKey(false)
statecountm.first()

YFN363u.png!web

To make our process our learning RDD using Spark, even more, interesting, I have come up with an interesting use case.

RDD using Spark: Pokemon Use Case

BJBbM3y.png!web

  • Firstly,  Let us download a Pokemon.csv file and load it to the spark-shell as we did to the Matches.csv file.
val PokemonDataRDD1 = sc.textFile("/user/edureka_566977/PokemonFile/PokemonData.csv")
PokemonDataRDD1.collect().foreach(println)

UrIZRbj.png!web

Pokemons are actually available in a large variety, Let us find a few varieties.

  • Removing schema from Pokemon.csv file

We might not need the Schema of Pokemon.csv file. Hence, we remove it.

val Head = PokemonDataRDD1.first()
val NoHeader = PokemonDataRDD1.filter(line => !line.equals(Head))

qIRjUzz.png!web

  • Finding the number of partitions our pokemon.csv is distributed into.
println("No.ofpartitions="+NoHeader.partitions.size)

3mERvmF.png!web

  • Water Pokemon

Finding the number of Water pokemon

val WaterRDD = PokemonDataRDD1.filter(line => line.contains("Water"))
WaterRDD.collect().foreach(println)

RvY7Jbb.png!web

  • Fire Pokemon

Finding the number of Fire pokemon

val FireRDD = PokemonDataRDD1.filter(line => line.contains("Fire"))
FireRDD.collect().foreach(println)

rYvM3eR.png!web

  • We can also detect the population of a different type of pokemon using the count function
<span>WaterRDD.count()</span>

aeYFNzQ.png!web

  • Since I like the game of defensive strategy let us find the pokemon with maximum defence.
val defenceList = NoHeader.map{x => x.split(',')}.map{xx => (x(6).toDouble)}
println("Highest_Defence : "+defenceList.max())

EJfuuyU.png!web

  • We know the maximum defence strength value but we don’t know which pokemon is it. so, let us find which is that pokemon.
val defWithPokemonName = NoHeader.map{x => x.split(',')}.map{x => (x(6).toDouble,x(1))}
val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered(1)(Ordering[Double].reverse.on(_._1))
MaxDefencePokemon.foreach(println)

eIfMze6.png!web

  • Now let us sort out the pokemon with least Defence
val minDefencePokemon = defenceList.distinct.sortBy(x => x.toDouble,true,1)
minDefencePokemon.take(5).foreach(println)

IjYvQzy.png!web

  • Now let us see the Pokemon with a less defensive strategy.
val PokemonDataRDD2 = sc.textFile("/user/edureka_566977/PokemonFile/PokemonData.csv")
val Head2 = PokemonDataRDD2.first()
val NoHeader2 = PokemonDataRDD2.filter(line => !line.equals(Head))
val defWithPokemonName2 = NoHeader2.map{x => x.split(',')}.map{x => (x(6).toDouble,x(1))}
val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered(1)(Ordering[Double].on(_._1))
MinDefencePokemon2.foreach(println)

FVZzEbB.png!web

Apache Spark and Scala Certification Training

So, with this, we come to an end of this RDD using Spark article. I hope we sparked a little light upon your knowledge about RDDs, their features and the various types of operations that can be performed on them.

This article based on  Apache Spark and Scala Certification Training is designed to prepare you for the Cloudera Hadoop and Spark Developer Certification Exam (CCA175). You will get an in-depth knowledge on Apache Spark and the Spark Ecosystem, which includes Spark RDD, Spark SQL, Spark MLlib and Spark Streaming. You will get comprehensive knowledge on Scala Programming language, HDFS, Sqoop, Flume, Spark GraphX and Messaging System such as Kafka.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK