[jira] [Commented] (SPARK-19364) Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are enabled and an exception is thrown

2017-03-09 Thread Andrew Milkowski (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903778#comment-15903778
 ] 

Andrew Milkowski commented on SPARK-19364:
--

thanks @Takeshi Yamamuro , will try to see if I can make this error consistent 
(we see it in prod non stop and it is consistent) I will see if I can throw the 
exception from in the kinesis receiver (java lib) and see stream blocks grow in 
spark, will provide line change to re-produce problem.. it is tired to kinesis 
java lib faulting on checkpoint throwing exception and spark persisting stream 
blocks and never releasing em from memory till eventual OME

> Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are 
> enabled and an exception is thrown 
> --
>
> Key: SPARK-19364
> URL: https://issues.apache.org/jira/browse/SPARK-19364
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: ubuntu unix
> spark 2.0.2
> application is java
>Reporter: Andrew Milkowski
>Priority: Blocker
>
> -- update --- we found that below situation occurs when we encounter
> "com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard"
> https://github.com/awslabs/amazon-kinesis-client/issues/108
> we use s3 directory (and dynamodb) to store checkpoints, but if such occurs 
> blocks should not get stuck but continue to be evicted gracefully from 
> memory, obviously kinesis library race condition is a problem onto itself...
> -- exception leading to a block not being freed up --
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException:  Caught 
> shutdown exception, skipping checkpoint.
> com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120)
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117)
>   at 
> org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
>   at 
> org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
>   at 
> org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
> running standard kinesis stream in

[jira] [Commented] (SPARK-19364) Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are enabled and an exception is thrown

2017-03-06 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897012#comment-15897012
 ] 

Takeshi Yamamuro commented on SPARK-19364:
--

Do you have any way to reproduce this issue? I couldn't track your scenario.
BTW, "input-X-XXX"s are stream blocks, so you cannot remove them via 
ContextCleaner. These stream blocks are periodically removed in 
DStream#clearMetadata inside:  
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L463
 

> Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are 
> enabled and an exception is thrown 
> --
>
> Key: SPARK-19364
> URL: https://issues.apache.org/jira/browse/SPARK-19364
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: ubuntu unix
> spark 2.0.2
> application is java
>Reporter: Andrew Milkowski
>Priority: Blocker
>
> -- update --- we found that below situation occurs when we encounter
> "com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard"
> https://github.com/awslabs/amazon-kinesis-client/issues/108
> we use s3 directory (and dynamodb) to store checkpoints, but if such occurs 
> blocks should not get stuck but continue to be evicted gracefully from 
> memory, obviously kinesis library race condition is a problem onto itself...
> -- exception leading to a block not being freed up --
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException:  Caught 
> shutdown exception, skipping checkpoint.
> com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120)
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
>   at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103)
>   at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117)
>   at 
> org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
>   at 
> org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
>   at 
> org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
> running standard kinesis stream ingestion with a java spark app and creating 
> dstream after running for some time some block