Re: Spark 2.0 with Kafka 0.10 exception
That's a good point... the dstreams package is still on 10.0.1 though. I'll make a ticket to update it. On Fri, Oct 21, 2016 at 1:02 PM, Srikanthwrote: > Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms > & max.poll.interval.ms can be set differently. > I'll leave it to you on how to add this to docs! > > > On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger wrote: >> >> Right on, I put in a PR to make a note of that in the docs. >> >> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth wrote: >> > Yeah, setting those params helped. >> > >> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger >> > wrote: >> >> >> >> 60 seconds for a batch is above the default settings in kafka related >> >> to heartbeat timeouts, so that might be related. Have you tried >> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related >> >> configs? >> >> >> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth >> >> wrote: >> >> > Bringing this thread back as I'm seeing this exception on a >> >> > production >> >> > kafka >> >> > cluster. >> >> > >> >> > I have two Spark streaming apps reading the same topic. App1 has >> >> > batch >> >> > interval 2secs and app2 has 60secs. >> >> > Both apps are running on the same cluster on similar hardware. I see >> >> > this >> >> > exception only in app2 and fairly consistently. >> >> > >> >> > Difference I see between the apps is >> >> > App1 >> >> > spark.streaming.kafka.maxRatePerPartition, 6000 >> >> > batch interval 2 secs >> >> > App2 >> >> > spark.streaming.kafka.maxRatePerPartition, 1 >> >> > batch interval 60 secs >> >> > >> >> > All other kafka/spark related configs are same for both apps. >> >> > spark.streaming.kafka.consumer.poll.ms = 4096 >> >> > spark.streaming.backpressure.enabled = true >> >> > >> >> > Not sure if pre-fetching or caching is messing things up. >> >> > >> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 >> >> > (TID >> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: >> >> > assertion >> >> > failed: Failed to get records for >> >> > spark-executor-StreamingEventSplitProd >> >> > mt_event 6 49091480 after polling for 4096 >> >> > at scala.Predef$.assert(Predef.scala:170) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >> >> > at >> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> >> > at >> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> >> > at >> >> > scala.collection.Iterator$$anon$21.next(Iterator.scala:838) >> >> > >> >> > >> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger >> >> > wrote: >> >> >> >> >> >> That's not what I would have expected to happen with a lower cache >> >> >> setting, but in general disabling the cache isn't something you want >> >> >> to do with the new kafka consumer. >> >> >> >> >> >> >> >> >> As far as the original issue, are you seeing those polling errors >> >> >> intermittently, or consistently? From your description, it sounds >> >> >> like retry is working correctly. >> >> >> >> >> >> >> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth >> >> >> wrote: >> >> >> > Setting those two results in below exception. >> >> >> > No.of executors < no.of partitions. Could that be triggering this? >> >> >> > >> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage >> >> >> > 2.0 >> >> >> > (TID 9) >> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not >> >> >> > safe >> >> >> > for >> >> >> > multi-threaded access >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) >> >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) >> >> >> > at java.util.HashMap.putVal(Unknown Source) >> >> >> > at java.util.HashMap.put(Unknown Source) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) >> >> >> > at >> >> >> > >> >> >> > >> >> >> > >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210) >> >> >> > at >>
Re: Spark 2.0 with Kafka 0.10 exception
Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms & max.poll.interval.ms can be set differently. I'll leave it to you on how to add this to docs! On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeningerwrote: > Right on, I put in a PR to make a note of that in the docs. > > On Thu, Oct 20, 2016 at 12:13 PM, Srikanth wrote: > > Yeah, setting those params helped. > > > > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger > wrote: > >> > >> 60 seconds for a batch is above the default settings in kafka related > >> to heartbeat timeouts, so that might be related. Have you tried > >> tweaking session.timeout.ms, heartbeat.interval.ms, or related > >> configs? > >> > >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth > wrote: > >> > Bringing this thread back as I'm seeing this exception on a production > >> > kafka > >> > cluster. > >> > > >> > I have two Spark streaming apps reading the same topic. App1 has batch > >> > interval 2secs and app2 has 60secs. > >> > Both apps are running on the same cluster on similar hardware. I see > >> > this > >> > exception only in app2 and fairly consistently. > >> > > >> > Difference I see between the apps is > >> > App1 > >> > spark.streaming.kafka.maxRatePerPartition, 6000 > >> > batch interval 2 secs > >> > App2 > >> > spark.streaming.kafka.maxRatePerPartition, 1 > >> > batch interval 60 secs > >> > > >> > All other kafka/spark related configs are same for both apps. > >> > spark.streaming.kafka.consumer.poll.ms = 4096 > >> > spark.streaming.backpressure.enabled = true > >> > > >> > Not sure if pre-fetching or caching is messing things up. > >> > > >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 > >> > (TID > >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: > >> > assertion > >> > failed: Failed to get records for spark-executor- > StreamingEventSplitProd > >> > mt_event 6 49091480 after polling for 4096 > >> > at scala.Predef$.assert(Predef.scala:170) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala: > 409) > >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala: > 409) > >> > at scala.collection.Iterator$$anon$21.next(Iterator.scala: > 838) > >> > > >> > > >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger > >> > wrote: > >> >> > >> >> That's not what I would have expected to happen with a lower cache > >> >> setting, but in general disabling the cache isn't something you want > >> >> to do with the new kafka consumer. > >> >> > >> >> > >> >> As far as the original issue, are you seeing those polling errors > >> >> intermittently, or consistently? From your description, it sounds > >> >> like retry is working correctly. > >> >> > >> >> > >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth > wrote: > >> >> > Setting those two results in below exception. > >> >> > No.of executors < no.of partitions. Could that be triggering this? > >> >> > > >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage > 2.0 > >> >> > (TID 9) > >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not > safe > >> >> > for > >> >> > multi-threaded access > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > >> >> > at java.util.HashMap.putVal(Unknown Source) > >> >> > at java.util.HashMap.put(Unknown Source) > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$ > KafkaRDDIterator.(KafkaRDD.scala:210) > >> >> > at > >> >> > > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> >> > at > >> >> > > >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
Re: Spark 2.0 with Kafka 0.10 exception
Right on, I put in a PR to make a note of that in the docs. On Thu, Oct 20, 2016 at 12:13 PM, Srikanthwrote: > Yeah, setting those params helped. > > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger wrote: >> >> 60 seconds for a batch is above the default settings in kafka related >> to heartbeat timeouts, so that might be related. Have you tried >> tweaking session.timeout.ms, heartbeat.interval.ms, or related >> configs? >> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth wrote: >> > Bringing this thread back as I'm seeing this exception on a production >> > kafka >> > cluster. >> > >> > I have two Spark streaming apps reading the same topic. App1 has batch >> > interval 2secs and app2 has 60secs. >> > Both apps are running on the same cluster on similar hardware. I see >> > this >> > exception only in app2 and fairly consistently. >> > >> > Difference I see between the apps is >> > App1 >> > spark.streaming.kafka.maxRatePerPartition, 6000 >> > batch interval 2 secs >> > App2 >> > spark.streaming.kafka.maxRatePerPartition, 1 >> > batch interval 60 secs >> > >> > All other kafka/spark related configs are same for both apps. >> > spark.streaming.kafka.consumer.poll.ms = 4096 >> > spark.streaming.backpressure.enabled = true >> > >> > Not sure if pre-fetching or caching is messing things up. >> > >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 >> > (TID >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: >> > assertion >> > failed: Failed to get records for spark-executor-StreamingEventSplitProd >> > mt_event 6 49091480 after polling for 4096 >> > at scala.Predef$.assert(Predef.scala:170) >> > at >> > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >> > at >> > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >> > at >> > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838) >> > >> > >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger >> > wrote: >> >> >> >> That's not what I would have expected to happen with a lower cache >> >> setting, but in general disabling the cache isn't something you want >> >> to do with the new kafka consumer. >> >> >> >> >> >> As far as the original issue, are you seeing those polling errors >> >> intermittently, or consistently? From your description, it sounds >> >> like retry is working correctly. >> >> >> >> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote: >> >> > Setting those two results in below exception. >> >> > No.of executors < no.of partitions. Could that be triggering this? >> >> > >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 >> >> > (TID 9) >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe >> >> > for >> >> > multi-threaded access >> >> > at >> >> > >> >> > >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) >> >> > at >> >> > >> >> > >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) >> >> > at java.util.HashMap.putVal(Unknown Source) >> >> > at java.util.HashMap.put(Unknown Source) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) >> >> > at >> >> > >> >> > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210) >> >> > at >> >> > >> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> >> > at >> >> > >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> >> > at >> >> > >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> >> > at >> >> > >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> >> > at
Re: Spark 2.0 with Kafka 0.10 exception
Yeah, setting those params helped. On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeningerwrote: > 60 seconds for a batch is above the default settings in kafka related > to heartbeat timeouts, so that might be related. Have you tried > tweaking session.timeout.ms, heartbeat.interval.ms, or related > configs? > > On Wed, Oct 19, 2016 at 12:22 PM, Srikanth wrote: > > Bringing this thread back as I'm seeing this exception on a production > kafka > > cluster. > > > > I have two Spark streaming apps reading the same topic. App1 has batch > > interval 2secs and app2 has 60secs. > > Both apps are running on the same cluster on similar hardware. I see this > > exception only in app2 and fairly consistently. > > > > Difference I see between the apps is > > App1 > > spark.streaming.kafka.maxRatePerPartition, 6000 > > batch interval 2 secs > > App2 > > spark.streaming.kafka.maxRatePerPartition, 1 > > batch interval 60 secs > > > > All other kafka/spark related configs are same for both apps. > > spark.streaming.kafka.consumer.poll.ms = 4096 > > spark.streaming.backpressure.enabled = true > > > > Not sure if pre-fetching or caching is messing things up. > > > > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID > > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: > assertion > > failed: Failed to get records for spark-executor-StreamingEventSplitProd > > mt_event 6 49091480 after polling for 4096 > > at scala.Predef$.assert(Predef.scala:170) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838) > > > > > > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger > wrote: > >> > >> That's not what I would have expected to happen with a lower cache > >> setting, but in general disabling the cache isn't something you want > >> to do with the new kafka consumer. > >> > >> > >> As far as the original issue, are you seeing those polling errors > >> intermittently, or consistently? From your description, it sounds > >> like retry is working correctly. > >> > >> > >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote: > >> > Setting those two results in below exception. > >> > No.of executors < no.of partitions. Could that be triggering this? > >> > > >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 > >> > (TID 9) > >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe > for > >> > multi-threaded access > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > >> > at java.util.HashMap.putVal(Unknown Source) > >> > at java.util.HashMap.put(Unknown Source) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.( > KafkaRDD.scala:210) > >> > at > >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > >> > at > >> > > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > >> > at org.apache.spark.scheduler.Task.run(Task.scala:85) > >> > at
Re: Spark 2.0 with Kafka 0.10 exception
60 seconds for a batch is above the default settings in kafka related to heartbeat timeouts, so that might be related. Have you tried tweaking session.timeout.ms, heartbeat.interval.ms, or related configs? On Wed, Oct 19, 2016 at 12:22 PM, Srikanthwrote: > Bringing this thread back as I'm seeing this exception on a production kafka > cluster. > > I have two Spark streaming apps reading the same topic. App1 has batch > interval 2secs and app2 has 60secs. > Both apps are running on the same cluster on similar hardware. I see this > exception only in app2 and fairly consistently. > > Difference I see between the apps is > App1 > spark.streaming.kafka.maxRatePerPartition, 6000 > batch interval 2 secs > App2 > spark.streaming.kafka.maxRatePerPartition, 1 > batch interval 60 secs > > All other kafka/spark related configs are same for both apps. > spark.streaming.kafka.consumer.poll.ms = 4096 > spark.streaming.backpressure.enabled = true > > Not sure if pre-fetching or caching is messing things up. > > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion > failed: Failed to get records for spark-executor-StreamingEventSplitProd > mt_event 6 49091480 after polling for 4096 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838) > > > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger wrote: >> >> That's not what I would have expected to happen with a lower cache >> setting, but in general disabling the cache isn't something you want >> to do with the new kafka consumer. >> >> >> As far as the original issue, are you seeing those polling errors >> intermittently, or consistently? From your description, it sounds >> like retry is working correctly. >> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote: >> > Setting those two results in below exception. >> > No.of executors < no.of partitions. Could that be triggering this? >> > >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 >> > (TID 9) >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for >> > multi-threaded access >> > at >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) >> > at >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) >> > at >> > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) >> > at java.util.HashMap.putVal(Unknown Source) >> > at java.util.HashMap.put(Unknown Source) >> > at >> > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) >> > at >> > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) >> > at >> > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) >> > at org.apache.spark.scheduler.Task.run(Task.scala:85) >> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >> > at java.lang.Thread.run(Unknown Source) >> > >> > >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger >> > wrote: >> >> >> >> you could try setting >> >> >> >>
Re: Spark 2.0 with Kafka 0.10 exception
Bringing this thread back as I'm seeing this exception on a production kafka cluster. I have two Spark streaming apps reading the same topic. App1 has batch interval 2secs and app2 has 60secs. Both apps are running on the same cluster on similar hardware. I see this exception only in app2 and fairly consistently. Difference I see between the apps is App1 spark.streaming.kafka.maxRatePerPartition, 6000 batch interval 2 secs App2 spark.streaming.kafka.maxRatePerPartition, 1 batch interval 60 secs All other kafka/spark related configs are same for both apps. spark.streaming.kafka.consumer.poll.ms = 4096 spark.streaming.backpressure.enabled = true Not sure if pre-fetching or caching is messing things up. 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-StreamingEventSplitProd mt_event 6 49091480 after polling for 4096 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer. get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$ KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$ KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$21.next(Iterator.scala:838) On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeningerwrote: > That's not what I would have expected to happen with a lower cache > setting, but in general disabling the cache isn't something you want > to do with the new kafka consumer. > > > As far as the original issue, are you seeing those polling errors > intermittently, or consistently? From your description, it sounds > like retry is working correctly. > > > On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote: > > Setting those two results in below exception. > > No.of executors < no.of partitions. Could that be triggering this? > > > > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 > (TID 9) > > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > > multi-threaded access > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > > at java.util.HashMap.putVal(Unknown Source) > > at java.util.HashMap.put(Unknown Source) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.( > KafkaRDD.scala:210) > > at org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > > at org.apache.spark.scheduler.Task.run(Task.scala:85) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > at java.lang.Thread.run(Unknown Source) > > > > > > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger > wrote: > >> > >> you could try setting > >> > >> spark.streaming.kafka.consumer.cache.initialCapacity > >> > >> spark.streaming.kafka.consumer.cache.maxCapacity > >> > >> to 1 > >> > >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth wrote: > >> > I had a look at the executor logs and noticed that this exception > >> > happens > >> > only when using the cached consumer. > >> > Every retry is successful. This is consistent. > >> > One possibility is that the cached
Re: Spark 2.0 with Kafka 0.10 exception
It's a really noticeable overhead, without the cache you're basically pulling every message twice due to prefetching. On Wed, Sep 7, 2016 at 3:23 PM, Srikanthwrote: > Yea, disabling cache was not going to be my permanent solution either. > I was going to ask how big an overhead is that? > > It happens intermittently and each time it happens retry is successful. > > Srikanth > > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger wrote: >> >> That's not what I would have expected to happen with a lower cache >> setting, but in general disabling the cache isn't something you want >> to do with the new kafka consumer. >> >> >> As far as the original issue, are you seeing those polling errors >> intermittently, or consistently? From your description, it sounds >> like retry is working correctly. >> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote: >> > Setting those two results in below exception. >> > No.of executors < no.of partitions. Could that be triggering this? >> > >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 >> > (TID 9) >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for >> > multi-threaded access >> > at >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) >> > at >> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) >> > at >> > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) >> > at java.util.HashMap.putVal(Unknown Source) >> > at java.util.HashMap.put(Unknown Source) >> > at >> > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) >> > at >> > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> > at >> > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) >> > at >> > >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) >> > at org.apache.spark.scheduler.Task.run(Task.scala:85) >> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >> > at java.lang.Thread.run(Unknown Source) >> > >> > >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger >> > wrote: >> >> >> >> you could try setting >> >> >> >> spark.streaming.kafka.consumer.cache.initialCapacity >> >> >> >> spark.streaming.kafka.consumer.cache.maxCapacity >> >> >> >> to 1 >> >> >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth wrote: >> >> > I had a look at the executor logs and noticed that this exception >> >> > happens >> >> > only when using the cached consumer. >> >> > Every retry is successful. This is consistent. >> >> > One possibility is that the cached consumer is causing the failure as >> >> > retry >> >> > clears it. >> >> > Is there a way to disable cache and test this? >> >> > Again, kafkacat is running fine on the same node. >> >> > >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID >> >> > 7849) >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID >> >> > 7851 >> >> > >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition >> >> > 2 >> >> > offsets 57079162 -> 57090330 >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition >> >> > 0 >> >> > offsets 57098866 -> 57109957 >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 >> >> > (TID >> >> > 7851). 1030 bytes result sent to driver >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage >> >> > 138.0 >> >> > (TID >> >> > 7849) >> >> > java.lang.AssertionError: assertion failed: Failed to get records for >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling >> >> > for >> >> > 2048 >> >> > at
Re: Spark 2.0 with Kafka 0.10 exception
Yea, disabling cache was not going to be my permanent solution either. I was going to ask how big an overhead is that? It happens intermittently and each time it happens retry is successful. Srikanth On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeningerwrote: > That's not what I would have expected to happen with a lower cache > setting, but in general disabling the cache isn't something you want > to do with the new kafka consumer. > > > As far as the original issue, are you seeing those polling errors > intermittently, or consistently? From your description, it sounds > like retry is working correctly. > > > On Wed, Sep 7, 2016 at 2:37 PM, Srikanth wrote: > > Setting those two results in below exception. > > No.of executors < no.of partitions. Could that be triggering this? > > > > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 > (TID 9) > > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > > multi-threaded access > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > > at java.util.HashMap.putVal(Unknown Source) > > at java.util.HashMap.put(Unknown Source) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.( > KafkaRDD.scala:210) > > at org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > > at org.apache.spark.scheduler.Task.run(Task.scala:85) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > at java.lang.Thread.run(Unknown Source) > > > > > > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger > wrote: > >> > >> you could try setting > >> > >> spark.streaming.kafka.consumer.cache.initialCapacity > >> > >> spark.streaming.kafka.consumer.cache.maxCapacity > >> > >> to 1 > >> > >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth wrote: > >> > I had a look at the executor logs and noticed that this exception > >> > happens > >> > only when using the cached consumer. > >> > Every retry is successful. This is consistent. > >> > One possibility is that the cached consumer is causing the failure as > >> > retry > >> > clears it. > >> > Is there a way to disable cache and test this? > >> > Again, kafkacat is running fine on the same node. > >> > > >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID > >> > 7849) > >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID > >> > 7851 > >> > > >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 > >> > offsets 57079162 -> 57090330 > >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 > >> > offsets 57098866 -> 57109957 > >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID > >> > 7851). 1030 bytes result sent to driver > >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 > >> > (TID > >> > 7849) > >> > java.lang.AssertionError: assertion failed: Failed to get records for > >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling > >> > for > >> > 2048 > >> > at scala.Predef$.assert(Predef.scala:170) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > >> > at > >> > > >> >
Re: Spark 2.0 with Kafka 0.10 exception
That's not what I would have expected to happen with a lower cache setting, but in general disabling the cache isn't something you want to do with the new kafka consumer. As far as the original issue, are you seeing those polling errors intermittently, or consistently? From your description, it sounds like retry is working correctly. On Wed, Sep 7, 2016 at 2:37 PM, Srikanthwrote: > Setting those two results in below exception. > No.of executors < no.of partitions. Could that be triggering this? > > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 9) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > at java.util.HashMap.putVal(Unknown Source) > at java.util.HashMap.put(Unknown Source) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210) > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > at java.lang.Thread.run(Unknown Source) > > > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger wrote: >> >> you could try setting >> >> spark.streaming.kafka.consumer.cache.initialCapacity >> >> spark.streaming.kafka.consumer.cache.maxCapacity >> >> to 1 >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth wrote: >> > I had a look at the executor logs and noticed that this exception >> > happens >> > only when using the cached consumer. >> > Every retry is successful. This is consistent. >> > One possibility is that the cached consumer is causing the failure as >> > retry >> > clears it. >> > Is there a way to disable cache and test this? >> > Again, kafkacat is running fine on the same node. >> > >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID >> > 7849) >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID >> > 7851 >> > >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 >> > offsets 57079162 -> 57090330 >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 >> > offsets 57098866 -> 57109957 >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID >> > 7851). 1030 bytes result sent to driver >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 >> > (TID >> > 7849) >> > java.lang.AssertionError: assertion failed: Failed to get records for >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling >> > for >> > 2048 >> > at scala.Predef$.assert(Predef.scala:170) >> > at >> > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >> > at >> > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >> > at >> > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >> > >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task >> > 7854 >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID >> > 7854) >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 >> > offsets 57098866 -> 57109957 >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866 >> > >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0
Re: Spark 2.0 with Kafka 0.10 exception
Setting those two results in below exception. No.of executors < no.of partitions. Could that be triggering this? 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 9) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) at java.util.HashMap.putVal(Unknown Source) at java.util.HashMap.put(Unknown Source) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeningerwrote: > you could try setting > > spark.streaming.kafka.consumer.cache.initialCapacity > > spark.streaming.kafka.consumer.cache.maxCapacity > > to 1 > > On Wed, Sep 7, 2016 at 2:02 PM, Srikanth wrote: > > I had a look at the executor logs and noticed that this exception happens > > only when using the cached consumer. > > Every retry is successful. This is consistent. > > One possibility is that the cached consumer is causing the failure as > retry > > clears it. > > Is there a way to disable cache and test this? > > Again, kafkacat is running fine on the same node. > > > > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID > 7849) > > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID > 7851 > > > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 > > offsets 57079162 -> 57090330 > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 > > offsets 57098866 -> 57109957 > > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID > > 7851). 1030 bytes result sent to driver > > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 > (TID > > 7849) > > java.lang.AssertionError: assertion failed: Failed to get records for > > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling > for > > 2048 > > at scala.Predef$.assert(Predef.scala:170) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > > > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task > 7854 > > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID > 7854) > > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 > > offsets 57098866 -> 57109957 > > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for > > spark-executor-StreamingPixelCount1 mt_event 0 57098866 > > > > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID > > 7854). 1103 bytes result sent to driver > > > > > > > > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth wrote: > >> > >> Thanks Cody. Setting poll timeout helped. > >> Our network is fine but brokers are not fully provisioned in test > cluster. > >> But there isn't enough load to max out on broker capacity. > >> Curious that kafkacat running on the same node doesn't have any issues. > >> > >> Srikanth > >> > >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger > >> wrote: > >>> > >>> You can set that poll timeout higher with >
Re: Spark 2.0 with Kafka 0.10 exception
you could try setting spark.streaming.kafka.consumer.cache.initialCapacity spark.streaming.kafka.consumer.cache.maxCapacity to 1 On Wed, Sep 7, 2016 at 2:02 PM, Srikanthwrote: > I had a look at the executor logs and noticed that this exception happens > only when using the cached consumer. > Every retry is successful. This is consistent. > One possibility is that the cached consumer is causing the failure as retry > clears it. > Is there a way to disable cache and test this? > Again, kafkacat is running fine on the same node. > > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849) > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851 > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 > offsets 57079162 -> 57090330 > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 > offsets 57098866 -> 57109957 > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID > 7851). 1030 bytes result sent to driver > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 (TID > 7849) > java.lang.AssertionError: assertion failed: Failed to get records for > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling for > 2048 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854 > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854) > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 > offsets 57098866 -> 57109957 > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-StreamingPixelCount1 mt_event 0 57098866 > > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID > 7854). 1103 bytes result sent to driver > > > > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth wrote: >> >> Thanks Cody. Setting poll timeout helped. >> Our network is fine but brokers are not fully provisioned in test cluster. >> But there isn't enough load to max out on broker capacity. >> Curious that kafkacat running on the same node doesn't have any issues. >> >> Srikanth >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger >> wrote: >>> >>> You can set that poll timeout higher with >>> >>> spark.streaming.kafka.consumer.poll.ms >>> >>> but half a second is fairly generous. I'd try to take a look at >>> what's going on with your network or kafka broker during that time. >>> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth wrote: >>> > Hello, >>> > >>> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. >>> > >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : >>> >> b8642491e78c5a13 >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for >>> >> spark-executor-example mt_event 0 15782114 >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group >>> >> spark-executor-example. >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 >>> >> (TID >>> >> 6) >>> >> java.lang.AssertionError: assertion failed: Failed to get records for >>> >> spark-executor-example mt_event 0 15782114 after polling for 512 >>> >> at scala.Predef$.assert(Predef.scala:170) >>> >> at >>> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >>> >> at >>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >>> >> at >>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >>> > >>> > >>> > I get this error intermittently. Sometimes a few batches are scheduled >>> > and >>> > run fine. Then I get this error. >>> > kafkacat is able to fetch from this topic continuously. >>> > >>> > Full exception is here -- >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 >>> > >>> > Srikanth >> >> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark 2.0 with Kafka 0.10 exception
I had a look at the executor logs and noticed that this exception happens only when using the cached consumer. Every retry is successful. This is consistent. One possibility is that the cached consumer is causing the failure as retry clears it. Is there a way to disable cache and test this? Again, kafkacat is running fine on the same node. 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849) 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 offsets 57079162 -> 57090330 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 offsets 57098866 -> 57109957 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID 7851). 1030 bytes result sent to driver 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 (TID 7849) java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling for 2048 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854) 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 offsets 57098866 -> 57109957 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for spark-executor-StreamingPixelCount1 mt_event 0 57098866 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID 7854). 1103 bytes result sent to driver On Wed, Aug 24, 2016 at 2:13 PM, Srikanthwrote: > Thanks Cody. Setting poll timeout helped. > Our network is fine but brokers are not fully provisioned in test cluster. > But there isn't enough load to max out on broker capacity. > Curious that kafkacat running on the same node doesn't have any issues. > > Srikanth > > On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger > wrote: > >> You can set that poll timeout higher with >> >> spark.streaming.kafka.consumer.poll.ms >> >> but half a second is fairly generous. I'd try to take a look at >> what's going on with your network or kafka broker during that time. >> >> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth wrote: >> > Hello, >> > >> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. >> > >> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 >> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 >> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for >> >> spark-executor-example mt_event 0 15782114 >> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator >> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group >> >> spark-executor-example. >> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 >> (TID >> >> 6) >> >> java.lang.AssertionError: assertion failed: Failed to get records for >> >> spark-executor-example mt_event 0 15782114 after polling for 512 >> >> at scala.Predef$.assert(Predef.scala:170) >> >> at >> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get( >> CachedKafkaConsumer.scala:74) >> >> at >> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:227) >> >> at >> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:193) >> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> > >> > >> > I get this error intermittently. Sometimes a few batches are scheduled >> and >> > run fine. Then I get this error. >> > kafkacat is able to fetch from this topic continuously. >> > >> > Full exception is here -- >> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 >> > >> > Srikanth >> > >
Re: Spark 2.0 with Kafka 0.10 exception
Thanks Cody. Setting poll timeout helped. Our network is fine but brokers are not fully provisioned in test cluster. But there isn't enough load to max out on broker capacity. Curious that kafkacat running on the same node doesn't have any issues. Srikanth On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeningerwrote: > You can set that poll timeout higher with > > spark.streaming.kafka.consumer.poll.ms > > but half a second is fairly generous. I'd try to take a look at > what's going on with your network or kafka broker during that time. > > On Tue, Aug 23, 2016 at 4:44 PM, Srikanth wrote: > > Hello, > > > > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. > > > >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 > >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 > >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for > >> spark-executor-example mt_event 0 15782114 > >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator > >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group > >> spark-executor-example. > >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 > (TID > >> 6) > >> java.lang.AssertionError: assertion failed: Failed to get records for > >> spark-executor-example mt_event 0 15782114 after polling for 512 > >> at scala.Predef$.assert(Predef.scala:170) > >> at > >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >> at > >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > >> at > >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > > > > > I get this error intermittently. Sometimes a few batches are scheduled > and > > run fine. Then I get this error. > > kafkacat is able to fetch from this topic continuously. > > > > Full exception is here -- > > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 > > > > Srikanth >
Re: Spark 2.0 with Kafka 0.10 exception
You can set that poll timeout higher with spark.streaming.kafka.consumer.poll.ms but half a second is fairly generous. I'd try to take a look at what's going on with your network or kafka broker during that time. On Tue, Aug 23, 2016 at 4:44 PM, Srikanthwrote: > Hello, > > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. > >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for >> spark-executor-example mt_event 0 15782114 >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group >> spark-executor-example. >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID >> 6) >> java.lang.AssertionError: assertion failed: Failed to get records for >> spark-executor-example mt_event 0 15782114 after polling for 512 >> at scala.Predef$.assert(Predef.scala:170) >> at >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >> at >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >> at >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > > I get this error intermittently. Sometimes a few batches are scheduled and > run fine. Then I get this error. > kafkacat is able to fetch from this topic continuously. > > Full exception is here -- > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 > > Srikanth - To unsubscribe e-mail: user-unsubscr...@spark.apache.org