[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359786#comment-16359786 ] Apache Spark commented on SPARK-17147: -- User 'koeninger' has created a pull request for this issue: https://github.com/apache/spark/pull/20572 > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad >Priority: Major > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338032#comment-16338032 ] Justin Miller commented on SPARK-17147: --- I'm also seeing this behavior on a topic that has cleanup.policy=delete. The volume on this topic is very large, > 10 billion messages per day, and it seems to happen about once per day. Another topic with lower volume but larger messages happens every few days. 18/01/23 18:30:10 WARN TaskSetManager: Lost task 28.0 in stage 26.0 (TID 861, ,executor 15): java.lang.AssertionError: assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662 18/01/23 18:30:12 INFO TaskSetManager: Lost task 28.1 in stage 26.0 (TID 865) on ,executor 24: java.lang.AssertionError (assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662) [duplicate 1] 18/01/23 18:30:14 INFO TaskSetManager: Lost task 28.2 in stage 26.0 (TID 866) on ,executor 15: java.lang.AssertionError (assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662) [duplicate 2] 18/01/23 18:30:15 INFO TaskSetManager: Lost task 28.3 in stage 26.0 (TID 867) on ,executor 15: java.lang.AssertionError (assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662) [duplicate 3] 18/01/23 18:30:18 WARN TaskSetManager: Lost task 28.0 in stage 27.0 (TID 898, ,executor 6): java.lang.AssertionError: assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662 18/01/23 18:30:19 INFO TaskSetManager: Lost task 28.1 in stage 27.0 (TID 900) on ,executor 15: java.lang.AssertionError (assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662) [duplicate 1] 18/01/23 18:30:20 INFO TaskSetManager: Lost task 28.2 in stage 27.0 (TID 901) on ,executor 15: java.lang.AssertionError (assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662) [duplicate 2] 18/01/23 18:30:21 INFO TaskSetManager: Lost task 28.3 in stage 27.0 (TID 902) on ,executor 15: java.lang.AssertionError (assertion failed: Got wrong record for -8002 124 even after seeking to offset 1769485661 got back record.offset 1769485662) [duplicate 3] When checked with kafka-simple-consumer-shell, the offset is in fact missing: next offset = 1769485661 next offset = 1769485663 next offset = 1769485664 next offset = 1769485665 I'm currently testing out this branch in the persister and will post if it crashes again over the next few days (I currently have the kafka-10 source from the branch with a few extra log lines deployed). We're currently on log format 0.10.2 (upgraded yesterday) but saw the same issue on 0.9.0.0. chao.wu - Is this behavior similar to what you're seeing? > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad >Priority: Major > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337027#comment-16337027 ] chao.wu commented on SPARK-17147: - While the topic clean config is delete "cleanup.policy=delete", the offset is also non-consecutive. And it cause the spark streaming failure. h1. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad >Priority: Major > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157002#comment-16157002 ] Cody Koeninger commented on SPARK-17147: Patch is there, if anyone wants to test it and provide feedback I'm happy to help. We don't use compacted topics, so I can't really test it against production workloads. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156937#comment-16156937 ] Michal W commented on SPARK-17147: -- Hey, just wanted to check if there's any progress on this issue? > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887342#comment-15887342 ] Dean Wampler commented on SPARK-17147: -- Cody, thanks for the suggestion. We'll try to test it and also suggest a customer do the same who wants this functionality. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887091#comment-15887091 ] Cody Koeninger commented on SPARK-17147: Dean if you guys have any bandwith to help test out https://github.com/koeninger/spark-1/tree/SPARK-17147 I'm happy to iterate on whatever you find. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882778#comment-15882778 ] Dean Wampler commented on SPARK-17147: -- We're interested in this enhancement. Anyone know if and one it will be implemented in Spark? > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744380#comment-15744380 ] caolan commented on SPARK-17147: Sure, I will give a try on your branch, will let you know the result. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15743024#comment-15743024 ] Sean McKibben commented on SPARK-17147: --- I apologize for the extended radio silence on this. I've been trying to track down problems I'm seeing after dynamically scaling down Spark Streaming jobs in Mesos. Haven't been able to attribute them to Spark Streaming Kafka (or a version thereof) yet but it's been difficult to pin down. I am hoping to return to testing the compacted consumer modifications soon. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742285#comment-15742285 ] Cody Koeninger commented on SPARK-17147: If compacted topics are important to you, then you should help test the branch listed above. Yes, you can try increasing poll.ms > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15740906#comment-15740906 ] caolan commented on SPARK-17147: I am using spark 2.0.0 + kafka 0.10 + compact mode topics even in some production environment. This fix is really important. so the question is how to decide it is important or not. Compact kafka topic should be widely used now, spark 2.0 should support it well. For the other issue, it did not happen all the time, did not have regular pattern, several times one day or did not happen in several days. so I should enlarge the spark.streaming.kafka.consumer.poll.ms, right. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15729055#comment-15729055 ] Cody Koeninger commented on SPARK-17147: This ticket is about createDirectStream. The question of whether it will be fixed is largely down to whether it's important enough to Sean or someone else to help test it thoroughly. The stack trace you posted more than likely has nothing to do with this ticket, especially if you aren't using log compaction. It's probably a network issue. Adjust spark.streaming.kafka.consumer.poll.ms, or do more investigation into what's going on with your network / Kafka. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15728326#comment-15728326 ] caolan commented on SPARK-17147: Will createDirectStream also be fixed. I hit the same issue, but I use KafkaUtils.createDirectStream() for compact mode kafka topics. BTW, I hit another issue recently, this is not a delete mode kafka topic, but it had the similar error log. - User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 501.0 failed 4 times, most recent failure: Lost task 0.3 in stage 501.0 (TID 505, siwu24yarn21x.com): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-xContext..dXa3AOoH 1029.job.dXa3AOoH.data 0 128825069 after polling for 512 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586422#comment-15586422 ] Justin Miller commented on SPARK-17147: --- K I'll try to assemble everything I've seen so far around it and post it to the list. Thanks! > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586417#comment-15586417 ] Cody Koeninger commented on SPARK-17147: If that's something you're seeing regularly, probably worth bringing it up on the mailing list, with a full stacktrace and whatever background you have > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586403#comment-15586403 ] Justin Miller commented on SPARK-17147: --- OK thank you. Might be related to the thousands of partitions we have spread across hundreds of topics. "Oops". > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)
[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586397#comment-15586397 ] Cody Koeninger commented on SPARK-17147: Then no, this issue is unlikely to affect you unless there's something wrong with your topic. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > (i.e. Log Compaction) > -- > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org