Hi,

Firstly want to say a big thanks to Cody for contributing the kafka direct
stream.

I have been using the receiver based approach for months but the direct
stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing delay.
And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these
exceptions.

I would be very appreciative for any direction and I can happily provide
more detail.

Thanks,
Conor

15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in
stage 66.0 (TID 406)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 407

Reply via email to