Re: Spark + Kafka all messages being used in 1 batch

2016-03-05 Thread Supreeth
Try setting spark.streaming.kafka.maxRatePerPartition, this can help control 
the number of messages read from Kafka per partition on the spark streaming 
consumer.

-S


> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari  wrote:
> 
> Hello,
> 
> I am trying to figure out why my kafka+spark job is running slow. I found 
> that spark is consuming all the messages out of kafka into a single batch 
> itself and not sending any messages to the other batches.
> 
> 2016/03/05 21:57:05 0 events - - queued 2016/03/05 21:57:00 0 events - - 
> queued 2016/03/05 21:56:55 0 events - - queued 2016/03/05 21:56:50 0 events - 
> - queued 2016/03/05 21:56:45 0 events - - queued 2016/03/05 21:56:40 4039573 
> events 6 ms - processing
> 
> Does anyone know how this behavior can be changed so that the number of 
> messages are load balanced across all the batches?
> 
> Thanks,
> Vinti


Re: Running kafka consumer in local mode - error - connection timed out

2016-01-25 Thread Supreeth
Hmm, thanks for the response.

The current value I have for socket.timeout.ms is 12. I am not sure if
this needs a higher value, not much from the logs.

The retry aspect makes sense, I can work around the same.

-S

On Mon, Jan 25, 2016 at 11:51 AM, Cody Koeninger  wrote:

> Should be socket.timeout.ms on the map of kafka config parameters.  The
> lack of retry is probably due to the differences between running spark in
> local mode vs standalone / mesos / yarn.
>
>
>
> On Mon, Jan 25, 2016 at 1:19 PM, Supreeth  wrote:
>
>> We are running a Kafka Consumer in local mode using Spark Streaming,
>> KafkaUtils.createDirectStream.
>>
>> The job runs as expected, however once in a very long time time, I see the
>> following exception.
>>
>> Wanted to check if others have faced a similar issue, and what are the
>> right
>> timeout parameters to change to avoid this issue.
>>
>> Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times,
>> most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307,
>> localhost):
>> java.net.ConnectException: Connection timed out
>> at sun.nio.ch.Net.connect0(Native Method)
>> at sun.nio.ch.Net.connect(Net.java:457)
>> at sun.nio.ch.Net.connect(Net.java:449)
>> at
>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
>> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>> at
>>
>> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
>> at
>>
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>> at
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at
>>
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>> at
>>
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> When this failure happens, the executor is not retried either.
>> Any help appreciated.
>>
>> -S
>>
>> ps: My last attempt to post did not succeed, apologies if this happens to
>> be
>> a re-post of my earlier post a 40 mins ago
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Running kafka consumer in local mode - error - connection timed out

2016-01-25 Thread Supreeth
We are running a Kafka Consumer in local mode using Spark Streaming,
KafkaUtils.createDirectStream. 

The job runs as expected, however once in a very long time time, I see the
following exception.

Wanted to check if others have faced a similar issue, and what are the right
timeout parameters to change to avoid this issue.

Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times,
most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307, localhost):
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.Net.connect(Net.java:449)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


When this failure happens, the executor is not retried either.
Any help appreciated.

-S

ps: My last attempt to post did not succeed, apologies if this happens to be
a re-post of my earlier post a 40 mins ago



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running kafka consumer in local mode - error - connection timed out

2016-01-25 Thread Supreeth
We are running a Kafka Consumer in local mode using Spark Streaming,
KafkaUtils.createDirectStream.

The job runs as expected, however once in a very long time time, I see the
following exception.

Wanted to check if others have faced a similar issue, and what are the
right timeout parameters to change to avoid this issue.

Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1
times, most recent failure: Lost task 5.0 in stage 30499.0 (TID
203307, localhost): java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.Net.connect(Net.java:449)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at 
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


When this failure happens, the executor is not retried either.

Any help appreciated.

-S