[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-16 Thread Ed Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159330#comment-17159330
 ] 

Ed Mitchell commented on SPARK-32151:
-

Yeah - I think you're right. It's going to be in the write ahead logs or the 
checkpoint.

> Kafka does not allow Partition Rebalance Handling
> -
>
> Key: SPARK-32151
> URL: https://issues.apache.org/jira/browse/SPARK-32151
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.5
>Reporter: Ed Mitchell
>Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 158933382 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)

[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-16 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159322#comment-17159322
 ] 

Gabor Somogyi commented on SPARK-32151:
---

I see, this case Structured Streaming is not affected.

> Kafka does not allow Partition Rebalance Handling
> -
>
> Key: SPARK-32151
> URL: https://issues.apache.org/jira/browse/SPARK-32151
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.5
>Reporter: Ed Mitchell
>Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 158933382 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>   at 
> 

[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-16 Thread Ed Mitchell (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159294#comment-17159294
 ] 

Ed Mitchell commented on SPARK-32151:
-

I could do that if I wanted to start back at the beginning or the end of the 
topic, but in this case, I would like it to restart back at the offsets defined 
by my datastore.

> Kafka does not allow Partition Rebalance Handling
> -
>
> Key: SPARK-32151
> URL: https://issues.apache.org/jira/browse/SPARK-32151
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.5
>Reporter: Ed Mitchell
>Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 158933382 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> 

[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling

2020-07-16 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159278#comment-17159278
 ] 

Gabor Somogyi commented on SPARK-32151:
---

Have you set `auto.offset.reset`?

> Kafka does not allow Partition Rebalance Handling
> -
>
> Key: SPARK-32151
> URL: https://issues.apache.org/jira/browse/SPARK-32151
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.5
>Reporter: Ed Mitchell
>Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the 
> Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared 
> when partitions are revoked and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers 
> joining the group (though it could), it does happen when the partition leader 
> is reelected because of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use 
> Kafka as the persistent store of offsets (they use their own database, and 
> possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 
> 158933382 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined 
> offset with no reset policy for partitions: [production-ad-metrics-1, 
> production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, 
> production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, 
> production-ad-metrics-7]
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>   at 
>