I have seen multiple blogs stating to use reduceByKey instead of
groupByKey. Could someone please help me in converting below code to use
reduceByKey


Code
....
some spark processing
...

Below
val viEventsWithListingsJoinSpsLevelMetric:
 
org.apache.spark.rdd.RDD[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
 com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary,
Long)]


  val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {
      case (viDetail, vi, itemId) =>
        (viDetail.get(0), viDetail.get(1).asInstanceOf[Long],
viDetail.get(2), viDetail.get(8).asInstanceOf[Int])
    }

We grouby above key so that we get an iterable (list), with list we can
compute .max values for powersellers and sellerstdlevel.

    val powerSellerLevel = sellerSegments.map {
      case (k, v) =>
        val viGrouped = v.toList
        val viPowerSellers = viGrouped.map { viTuple =>
Option(viTuple._2.powerSellerLevel).getOrElse("") }
        val viSellerStandardLevels = viGrouped.map { viTuple =>
Option(viTuple._2.sellerStdLevel).getOrElse("") }
        val powerSellerLevel = viPowerSellers.max
        val sellerStandardLevel = viSellerStandardLevels.max
        val viEventDetail = viGrouped.head._1
        val viSummary = viGrouped.head._2
        viSummary.powerSellerLevel = powerSellerLevel
        viSummary.sellerStdLevel = sellerStandardLevel
        viSummary.itemId = viGrouped.head._3
        (viEventDetail, viSummary)
    }


The above groupBy query ran for 6H and does not seem to finish. Hence i
started thinking of reduceByKey. Now reduceByKey() needs pairs and hence i
modified viEventsWithListingsJoinSpsLevelMetric ( x,y,z) to
viEventsWithListingsJoinSpsLevelMetric (A,B).

I moved the key generated through groupByquery into the processing of
viEventsWithListingsJoinSpsLevelMetric, so that
viEventsWithListingsJoinSpsLevelMetric is of type A,B. Hence it is modified
as

(((viEventDetail.get(0), viEventDetail.get(1).asInstanceOf[Long],
viEventDetail.get(2),
viEventDetail.get(8).asInstanceOf[Int])),(viEventDetail, viSummary,
itemId)).

Now i want to compute max values, and i do the next processing using
reduceByKey

val powerSellerLevel = viEventsWithListingsJoinSpsLevelMetric.reduceByKey {

      case (k, v) =>

        val viGrouped = v.toList

     // Some code to compute max needs to go here.

}


But i get a compiler error that v.toList is not supported.

[ERROR]
/Users/dvasthimal/ebay/projects/ep-spark/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/detail/viewitem/provider/VISummaryDataProvider.scala:115:
error: value toList is not a member of
(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary,
Long)

[INFO]         val viGrouped = v.toList

[INFO]                           ^

[ERROR] one error found


Now if you think, groupBy was generating (k, Iterable) and hence the next
map() could get list and run through that list to compute max. How is that
possible with reduceByKey because it never generates max.


Suggestions are appreciated.


-Deepak















On Thu, Apr 23, 2015 at 1:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]> wrote:

> I have a groupBy query after a map-side join & leftOuterJoin. And this
> query is running for more than 2 hours.
>
>
> asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
> RecordsErrors  0 36 0 RUNNING PROCESS_LOCAL 17 /
> phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22 23:27:00 1.4 h  29 s
> 61.8 MB / 63144909  0.0 B / 0
>
>
>
> The input looks to be only 60 MB.
> *Command*
> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
> --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>  *--num-executors 36 --driver-memory 12g --driver-java-options
> "-XX:MaxPermSize=8G" --executor-memory 12g* *--executor-cores 6* --queue
> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
> startDate=2015-04-6 endDate=2015-04-7
> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
> maxbuffersize=1068 maxResultSize=2G
>
> Queries
>
> 1. val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi)
> }
> 2.  Brodcast Map - Join
>
> val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
> .collectAsMapval broadCastMap = sc.broadcast(lstgItemMap)
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] = viEvents.mapPartitions({
>
> // buisness logic )}
>
> 3.
>
> Left Outer
>
> val spsLevelMetricSum = DataUtil.getSpsLevelMetricSum(sc, startDate)
>
> val spsLvlMetric = spsLevelMetricSum.map { sps => (sps.getUserId.toLong,
> sps) }
>
>  val viEventsWithListingsJoinSpsLevelMetric = viEventsWithListings
> .leftOuterJoin(spsLvlMetric).map {
>  // buisness logic
> }
>
> Any thoughts ?
>
> 4. Group BY :
>
> val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {
>
>       case (viDetail, vi, itemId) =>
>
>         (viDetail.get(0), viDetail.get(1).asInstanceOf[Long], viDetail
> .get(2), viDetail.get(8).asInstanceOf[Int])
>
>     }
>
>
> #4 is very slow.
>
> --
>
>
>
> Deepak
>
>


-- 
Deepak

Reply via email to