9

groupBy on multiple values

 2 years ago
source link: https://www.codesd.com/item/groupby-on-multiple-values.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

groupBy on multiple values

advertisements

I have a list of calls, smsIn, smsOut in a CSV file, and I want to count the number of smsIn/smsOut for each Phone number.

CallType indicates the type (call, smsIn, smsOut)

An example of the data is (phoneNumber, callType)

7035076600, 30
5081236732, 31
5024551234, 30
7035076600, 31
7035076600, 30

Ultimately, I want something like this: phoneNum, numSMSIn, numSMSOUt I have implemented something like this:

val smsOutByPhoneNum = partitionedCalls.
                       filter{ arry => arry(2) == 30}.
                       groupBy { x => x(1) }.
                       map(f=> (f._1,f._2.iterator.length)).
                       collect()

The above gives the number of SMS out for each phone number. Similarly

val smsInByPhoneNum = partitionedCalls.
                      filter{ arry => arry(2) == 31}.
                      groupBy { x => x(1) }.
                      map(f => (f._1, f._2.iterator.length)).
                      collect()

The above gives the number of SMS in for each phone number.

Is there a way where I can get both done in one iteration instead of two.


Great answer @zero323

val partitionedCalls = sc.parallelize(Array(("7035076600", "30"),
("5081236732", "31"), ("5024551234", "30"),("7035076600", "31"),
("7035076600", "30")))

# count the pairs <(phoneNumber, code), count>
val keyPairCounts = partitionedCalls.map((_,1))
# using reduceByKey
val aggregateCounts = keyPairCounts.reduceByKey(_ + _).map{ case((phNum,
inOrOut), cnt) => (phNum, (inOrOut, cnt)) }
# using groupBy to aggregate and merge similar keys
val result = aggregateCounts.groupByKey.map(x => (x._1,
x._2.toMap.values.toArray))

# collect the result
result.map(x => (x._1, x._2.lift(0).getOrElse(0),
x._2.lift(1).getOrElse(0))).collect().map(println)

Reference: A good explanation on difference between groupBy and reduceBy:prefer_reducebykey_over_groupbykey


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK