you could try setting spark.streaming.kafka.consumer.cache.initialCapacity
spark.streaming.kafka.consumer.cache.maxCapacity to 1 On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote: > I had a look at the executor logs and noticed that this exception happens > only when using the cached consumer. > Every retry is successful. This is consistent. > One possibility is that the cached consumer is causing the failure as retry > clears it. > Is there a way to disable cache and test this? > Again, kafkacat is running fine on the same node. > > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849) > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851 > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 > offsets 57079162 -> 57090330 > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 > offsets 57098866 -> 57109957 > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID > 7851). 1030 bytes result sent to driver > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 (TID > 7849) > java.lang.AssertionError: assertion failed: Failed to get records for > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling for > 2048 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854 > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854) > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 > offsets 57098866 -> 57109957 > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-StreamingPixelCount1 mt_event 0 57098866 > > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID > 7854). 1103 bytes result sent to driver > > > > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote: >> >> Thanks Cody. Setting poll timeout helped. >> Our network is fine but brokers are not fully provisioned in test cluster. >> But there isn't enough load to max out on broker capacity. >> Curious that kafkacat running on the same node doesn't have any issues. >> >> Srikanth >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >>> >>> You can set that poll timeout higher with >>> >>> spark.streaming.kafka.consumer.poll.ms >>> >>> but half a second is fairly generous. I'd try to take a look at >>> what's going on with your network or kafka broker during that time. >>> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com> wrote: >>> > Hello, >>> > >>> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. >>> > >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : >>> >> b8642491e78c5a13 >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for >>> >> spark-executor-example mt_event 0 15782114 >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group >>> >> spark-executor-example. >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 >>> >> (TID >>> >> 6) >>> >> java.lang.AssertionError: assertion failed: Failed to get records for >>> >> spark-executor-example mt_event 0 15782114 after polling for 512 >>> >> at scala.Predef$.assert(Predef.scala:170) >>> >> at >>> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) >>> >> at >>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) >>> >> at >>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >>> > >>> > >>> > I get this error intermittently. Sometimes a few batches are scheduled >>> > and >>> > run fine. Then I get this error. >>> > kafkacat is able to fetch from this topic continuously. >>> > >>> > Full exception is here -- >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 >>> > >>> > Srikanth >> >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org