I have a groupBy query after a map-side join & leftOuterJoin. And this
query is running for more than 2 hours.
asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsErrors
0 36 0 RUNNING PROCESS_LOCAL 17 /
phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22
23:27:00 1.4 h 29 s 61.8 MB / 63144909 0.0 B / 0
The input looks to be only 60 MB.
*Command*
./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
*--num-executors 36 --driver-memory 12g --driver-java-options
"-XX:MaxPermSize=8G" --executor-memory 12g* *--executor-cores 6* --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-6 endDate=2015-04-7
input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=2G
Queries
1. val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
2. Brodcast Map - Join
val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
.collectAsMapval broadCastMap = sc.broadcast(lstgItemMap)
val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= viEvents.mapPartitions({
// buisness logic )}
3.
Left Outer
val spsLevelMetricSum = DataUtil.getSpsLevelMetricSum(sc, startDate)
val spsLvlMetric = spsLevelMetricSum.map { sps => (sps.getUserId.toLong,
sps) }
val viEventsWithListingsJoinSpsLevelMetric = viEventsWithListings
.leftOuterJoin(spsLvlMetric).map {
// buisness logic
}
Any thoughts ?
4. Group BY :
val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy {
case (viDetail, vi, itemId) =>
(viDetail.get(0), viDetail.get(1).asInstanceOf[Long], viDetail.get(2),
viDetail.get(8).asInstanceOf[Int])
}
#4 is very slow.
--
Deepak