GitHub user jerryshao opened a pull request: https://github.com/apache/spark/pull/21038
[SPARK-22968][DStream] Fix Kafka connector partition revoked issue ## What changes were proposed in this pull request? Kafka partitions can be revoked when new consumers joined in the consumer group to rebalance the partitions. But current Spark Kafka connector code doesn't consider partition revoking scenarios, trying to get latest offset from revoked partitions, which will throw exceptions as JIRA mentioned. To reproduce this issue, user could start two `DirectKafkaWordCount` example subsequently. When the later one is started, it will trigger rebalance and reallocate the partitions across all the consumers, then the former one will throw an exception and stop the app. To fix it, Spark Kafka connector should consider partition revoking scenarios and don't maintain offsets for revoked partitions. Besides, this PR also fixes bugs in `DirectKafkaWordCount`, this example simply cannot be worked without the fix. ``` 8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, kssh-1] for group use_a_separate_group_id_for_each_stream 18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group use_a_separate_group_id_for_each_stream 18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined group use_a_separate_group_id_for_each_stream with generation 4 18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group use_a_separate_group_id_for_each_stream ``` ## How was this patch tested? This is manually verified in local cluster, unfortunately I'm not sure how to simulate it in UT, so propose the PR without UT added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerryshao/apache-spark SPARK-22968 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21038.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21038 ---- commit f317dec0d863a717dc424707571453b11c43e700 Author: jerryshao <sshao@...> Date: 2018-04-11T06:59:15Z Fix Kafka connector partition revoked issue ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org