Just an update that the kinesis checkpointing works well with orderly and kill -9 driver shutdowns when there is less than 4 shards. We use 20+.
I created a case with Amazon support since it is the AWS kinesis getRecords API which is hanging. Regards, Heji On Thu, Nov 12, 2015 at 10:37 AM, Hster Geguri <hster.investiga...@gmail.com > wrote: > Hello everyone, > > We are testing checkpointing against YARN 2.7.1 with Spark 1.5. We are > trying to make sure checkpointing works with orderly shutdowns(i.e. yarn > application --kill) and unexpected shutdowns which we simulate with a kill > -9. If there is anyone who has successfully tested failover recently with > Kinesis+YARN, I would appreciate even the confirmation this should work. > > We have a simple driver that does aggregate counts per minute against a > Kinesis stream. We see initial hanging behavior (~2- 10 minutes) in both > relaunches. When we do an "unexpected" shutdown of the application master > with "kill -9" of the jvm process, yarn successfully kills the orphan > executors, launches a new driver with new executors. The logs indicate the > restoring the checkpoint was successful. However, the first two Spark > streaming batches which are of 0 events intermittently hang for anywhere > between 2-10+ minutes with a SocketTimeoutException while doing a Kinesis > getRecords (stack trace at the end of this mail). > > Under normal circumstances, Spark skips the transform and mapToPair stages > on 0 events. However when the executors hang, we notice that the job goes > through the transform stage and tasks hangs in making a getRecord call from > Kinesis for 2 minutes before emitting a "SocketTimeoutException: Read timed > out" for a Kinesis getRecords call. > > Kinesis as a service should behave more gracefully even if it was fed bad > parameters but why does Spark call getRecords when the batch size is 0 when > relaunching? > > Any input is greatly appreciated as we are stuck on testing failover. > > Heji > > I've put the stack trace below: > > [2015-11-09 15:20:23,478] INFO Unable to execute HTTP request: Read timed > out (com.amazonaws.http.AmazonHttpClient) > java.net.SocketTimeoutException: Read timed out > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) > at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) > at sun.security.ssl.InputRecord.read(InputRecord.java:532) > at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) > at > sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) > at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) > at > org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198) > at > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178) > at > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) > at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) > at > com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176) > at > com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279) > at > com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129) > at > com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123) > at > com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185) > at > com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58) > at > com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31) > at > com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93) > at > com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43) > at > com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50) > at > com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31) > at > com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106) > at > com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42) > at > com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) > at > com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) > at > com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2490) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1142) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:258) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:213) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:199) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:155) > at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:127) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > >