I have a groupBy query after a map-side join & leftOuterJoin. And this
query is running for more than 2 hours.


asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsErrors
0 36 0 RUNNING PROCESS_LOCAL 17 /
phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22
23:27:00 1.4 h  29 s  61.8 MB / 63144909  0.0 B / 0



The input looks to be only 60 MB.
*Command*
./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 *--num-executors 36 --driver-memory 12g --driver-java-options
"-XX:MaxPermSize=8G" --executor-memory 12g* *--executor-cores 6* --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-6 endDate=2015-04-7
input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=2G

Queries

1. val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
2.  Brodcast Map - Join

val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
.collectAsMapval broadCastMap = sc.broadcast(lstgItemMap)

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= viEvents.mapPartitions({

// buisness logic )}

3.

Left Outer

val spsLevelMetricSum = DataUtil.getSpsLevelMetricSum(sc, startDate)

val spsLvlMetric = spsLevelMetricSum.map { sps => (sps.getUserId.toLong,
sps) }

 val viEventsWithListingsJoinSpsLevelMetric = viEventsWithListings
.leftOuterJoin(spsLvlMetric).map {
 // buisness logic
}

Any thoughts ?

4. Group BY :

val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {

      case (viDetail, vi, itemId) =>

        (viDetail.get(0), viDetail.get(1).asInstanceOf[Long], viDetail.get(2),
viDetail.get(8).asInstanceOf[Int])

    }


#4 is very slow.

-- 



Deepak

Reply via email to