Re: Spark + Kafka all messages being used in 1 batch
Try setting spark.streaming.kafka.maxRatePerPartition, this can help control the number of messages read from Kafka per partition on the spark streaming consumer. -S > On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari wrote: > > Hello, > > I am trying to figure out why my kafka+spark job is running slow. I found > that spark is consuming all the messages out of kafka into a single batch > itself and not sending any messages to the other batches. > > 2016/03/05 21:57:05 0 events - - queued 2016/03/05 21:57:00 0 events - - > queued 2016/03/05 21:56:55 0 events - - queued 2016/03/05 21:56:50 0 events - > - queued 2016/03/05 21:56:45 0 events - - queued 2016/03/05 21:56:40 4039573 > events 6 ms - processing > > Does anyone know how this behavior can be changed so that the number of > messages are load balanced across all the batches? > > Thanks, > Vinti
Re: Running kafka consumer in local mode - error - connection timed out
Hmm, thanks for the response. The current value I have for socket.timeout.ms is 12. I am not sure if this needs a higher value, not much from the logs. The retry aspect makes sense, I can work around the same. -S On Mon, Jan 25, 2016 at 11:51 AM, Cody Koeninger wrote: > Should be socket.timeout.ms on the map of kafka config parameters. The > lack of retry is probably due to the differences between running spark in > local mode vs standalone / mesos / yarn. > > > > On Mon, Jan 25, 2016 at 1:19 PM, Supreeth wrote: > >> We are running a Kafka Consumer in local mode using Spark Streaming, >> KafkaUtils.createDirectStream. >> >> The job runs as expected, however once in a very long time time, I see the >> following exception. >> >> Wanted to check if others have faced a similar issue, and what are the >> right >> timeout parameters to change to avoid this issue. >> >> Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times, >> most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307, >> localhost): >> java.net.ConnectException: Connection timed out >> at sun.nio.ch.Net.connect0(Native Method) >> at sun.nio.ch.Net.connect(Net.java:457) >> at sun.nio.ch.Net.connect(Net.java:449) >> at >> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) >> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) >> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) >> at >> >> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) >> at >> >> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) >> at >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) >> at >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >> at >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> at >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) >> at >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) >> at >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) >> at >> >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) >> at >> >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) >> at >> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) >> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at >> >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) >> at >> >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> When this failure happens, the executor is not retried either. >> Any help appreciated. >> >> -S >> >> ps: My last attempt to post did not succeed, apologies if this happens to >> be >> a re-post of my earlier post a 40 mins ago >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Running kafka consumer in local mode - error - connection timed out
We are running a Kafka Consumer in local mode using Spark Streaming, KafkaUtils.createDirectStream. The job runs as expected, however once in a very long time time, I see the following exception. Wanted to check if others have faced a similar issue, and what are the right timeout parameters to change to avoid this issue. Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times, most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307, localhost): java.net.ConnectException: Connection timed out at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.Net.connect(Net.java:449) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) When this failure happens, the executor is not retried either. Any help appreciated. -S ps: My last attempt to post did not succeed, apologies if this happens to be a re-post of my earlier post a 40 mins ago -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running kafka consumer in local mode - error - connection timed out
We are running a Kafka Consumer in local mode using Spark Streaming, KafkaUtils.createDirectStream. The job runs as expected, however once in a very long time time, I see the following exception. Wanted to check if others have faced a similar issue, and what are the right timeout parameters to change to avoid this issue. Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times, most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307, localhost): java.net.ConnectException: Connection timed out at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.Net.connect(Net.java:449) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) When this failure happens, the executor is not retried either. Any help appreciated. -S