you could try setting

spark.streaming.kafka.consumer.cache.initialCapacity

spark.streaming.kafka.consumer.cache.maxCapacity

to 1

On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote:
> I had a look at the executor logs and noticed that this exception happens
> only when using the cached consumer.
> Every retry is successful. This is consistent.
> One possibility is that the cached consumer is causing the failure as retry
> clears it.
> Is there a way to disable cache and test this?
> Again, kafkacat is running fine on the same node.
>
> 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849)
> 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851
>
> 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
> offsets 57079162 -> 57090330
> 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
> offsets 57098866 -> 57109957
> 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
> 7851). 1030 bytes result sent to driver
> 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 (TID
> 7849)
> java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling for
> 2048
>       at scala.Predef$.assert(Predef.scala:170)
>       at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>       at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>       at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>
> 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854
> 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854)
> 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
> offsets 57098866 -> 57109957
> 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-StreamingPixelCount1 mt_event 0 57098866
>
> 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID
> 7854). 1103 bytes result sent to driver
>
>
>
> On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote:
>>
>> Thanks Cody. Setting poll timeout helped.
>> Our network is fine but brokers are not fully provisioned in test cluster.
>> But there isn't enough load to max out on broker capacity.
>> Curious that kafkacat running on the same node doesn't have any issues.
>>
>> Srikanth
>>
>> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> You can set that poll timeout higher with
>>>
>>> spark.streaming.kafka.consumer.poll.ms
>>>
>>> but half a second is fairly generous.  I'd try to take a look at
>>> what's going on with your network or kafka broker during that time.
>>>
>>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com> wrote:
>>> > Hello,
>>> >
>>> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.
>>> >
>>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
>>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
>>> >> b8642491e78c5a13
>>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>>> >> spark-executor-example mt_event 0 15782114
>>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
>>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>>> >> spark-executor-example.
>>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0
>>> >> (TID
>>> >> 6)
>>> >> java.lang.AssertionError: assertion failed: Failed to get records for
>>> >> spark-executor-example mt_event 0 15782114 after polling for 512
>>> >> at scala.Predef$.assert(Predef.scala:170)
>>> >> at
>>> >>
>>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>>> >> at
>>> >>
>>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>>> >> at
>>> >>
>>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> >
>>> >
>>> > I get this error intermittently. Sometimes a few batches are scheduled
>>> > and
>>> > run fine. Then I get this error.
>>> > kafkacat is able to fetch from this topic continuously.
>>> >
>>> > Full exception is here --
>>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>>> >
>>> > Srikanth
>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to