Guys,

I am trying hard to make a DStream API Spark streaming job work on EMR. I’ve 
succeeded to the point of running it for a few hours with eventual failure 
which is when I start seeing some out of memory exception via “yarn logs” 
aggregate.

I am doing a JSON map and extraction of some fields via play-json in the map 
portion (mappedEvents)

val mappedEvents:DStream[(String, Iterable[Array[Byte]])] = {map json events 
keyed off of `user-id` }

val stateFulRDD = mappedEvents
.reduceByKeyAndWindow( (x: Iterable[Array[Byte]], y:Iterable[Array[Byte]]) => {
      val x1 = x.map(arrayToUserSession)
      val y1 = y.map(arrayToUserSession)
      val z = x1 ++ y1
      val now = System.currentTimeMillis()
      z.groupBy(_.psid).map(_._2.maxBy(_.lastTime))
        .filter(l =>  l.lastTime + eventValiditySeconds*1000 >= now)
          .map(userSessionToArray)
  }, windowSize, slidingInterval)
  .filter(_._2.size>1)
.mapWithState(stateSpec)

//doing sessionization where I keep last timestamp as the beginning of session 
via mapWithState for any session counts > 1 to make it use state API less 
frequently.

val stateSpec = StateSpec.function(updateUserEvents 
_).timeout(windowSize.times(1).plus(batchInterval))

def updateUserEvents(key: String,
                     newValue: 
Option[scala.collection.immutable.Iterable[Array[Byte]]],
                     state: State[UserSessions]): Option[UserSessions]


My window is 60 seconds and my slidingInterval is 10 seconds with batchInterval 
of 20 seconds

In my load test of 250K records per second (each record is around 1.6KB) in 
Kinesis Stream running on EMR 5.7.0 cluster on yarn with 25 core nodes of 
m4.2xlarge and a master of m4.4xlarge with plenty of EBS Storage sc1 attached 
(10TB), I cannot sustain load for longer than 2 hours. The cluster errors out.

This is my submit job parameters,

aws emr add-steps --cluster-id $CLUSTER_ID --steps 
Name=SessionCount,Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/usr/lib/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn,--conf,spark.streaming.stopGracefullyOnShutdown=true,--conf,spark.locality.wait=7500ms,--conf,spark.streaming.blockInterval=10000ms,--conf,spark.shuffle.consolidateFiles=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.closure.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.dynamicAllocation.enabled=true,--conf,spark.scheduler.mode=FIFO,--conf,spark.ui.retainedJobs=50,--conf,spark.ui.retainedStages=50,--conf,spark.ui.retainedTasks=500,--conf,spark.worker.ui.retainedExecutors=50,--conf,spark.worker.ui.retainedDrivers=50,--conf,spark.sql.ui.retainedExecutions=50,--conf,spark.streaming.ui.retainedBatches=50,--conf,spark.rdd.compress=false,--conf,spark.yarn.executor.memoryOverhead=5120,--executor-memory,15G,--class,SessionCountX
  - and job parameters follow

with env.json
{
{
  "Classification": "yarn-site",
  "Properties": {
    "yarn.log-aggregation-enable": "true",
    "yarn.log-aggregation.retain-seconds": "-1",
    "yarn.nodemanager.remote-app-log-dir": "s3:\/\/my-bucket-logs",
    "yarn.nodemanager.vmem.check.enabled": "false"
  }
},
{
  "Classification": "spark",
  "Properties": {
    "maximizeResourceAllocation": "true"
  }
}
}

Also, looking at the executors page of Spark UI, I see Input continuing to grow 
with time. I am not sure if the fact that user-id is UUID.random() in the load 
test is the cause of that and if I should load test with finite set of 
user-id’s for limited key-space in Spark but that is something I noticed. 
Shuffle read/write size normalizes eventually though and stays about the same.


The following exceptions are seen from a failed job:

17/07/26 07:13:51 WARN NettyRpcEndpointRef: Error sending message [message = 
Heartbeat(28,[Lscala.Tuple2;@2147b84f,BlockManagerId(28, 
ip-10-202-138-81.mlbam.qa.us-east-1.bamgrid.net, 38630, None))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. 
This timeout is controlled by spark.executor.heartbeatInterval


17/07/26 07:13:51 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Thread-9,5,main]
java.lang.OutOfMemoryError: Java heap space
                at 
io.netty.buffer.UnpooledHeapByteBuf.<init>(UnpooledHeapByteBuf.java:45)
                at 
io.netty.buffer.UnpooledUnsafeHeapByteBuf.<init>(UnpooledUnsafeHeapByteBuf.java:29)
                at 
io.netty.buffer.UnpooledByteBufAllocator.newHeapBuffer(UnpooledByteBufAllocator.java:59)
                at 
io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:158)
                at 
io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:149)
                at io.netty.buffer.Unpooled.buffer(Unpooled.java:116)
                at 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.toByteBuffer(BlockTransferMessage.java:78)
                at 
org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock(NettyBlockTransferService.scala:137)
                at 
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:121)
                at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$replicate(BlockManager.scala:1220)
                at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1067)
                at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
                at 
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
                at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
                at 
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:732)
                at 
org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:80)
                at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
                at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
                at 
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
                at 
org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:282)
                at 
org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:352)
                at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
                at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
                at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
17/07/26 07:13:51 WARN KinesisRecordProcessor: No shardId for workerId 
ip-10-202-138-81.mlbam.qa.us-east-1.bamgrid.net:ce6e8bc3-d484-4d26-8bd1-168c933803e3?
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because 
it has not yet stared
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because 
it has not yet stared
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because 
it has not yet stared
17/07/26 07:13:51 WARN ReceiverSupervisorImpl: Skip stopping receiver because 
it has not yet stared


# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 27914"...
os::fork_and_exec failed: Cannot allocate memory (12)
End of LogType:stdout


What else can I do or look for? Any help would be greatly appreciated. Thank 
you.

Reply via email to