Re: Spark Kafka API tries connecting to dead node for every batch, which increases the processing time

2017-10-16 Thread Cody Koeninger
Have you tried the 0.10 integration?

I'm not sure how you would know whether a broker is up or down without
attempting to connect to it.  Do you have an alternative suggestion?
Not sure how much interest there is in patches to the 0.8 integration
at this point.



On Mon, Oct 16, 2017 at 9:23 AM, Suprith T Jain  wrote:
> Yes I tried that. But it's not that effective.
>
> In fact kafka SimpleConsumer tries to reconnect in case of socket error
> (sendRequest method). So it ll always be twice the timeout for every window
> and for every node that is down.
>
>
> On 16-Oct-2017 7:34 PM, "Cody Koeninger"  wrote:
>>
>> Have you tried adjusting the timeout?
>>
>> On Mon, Oct 16, 2017 at 8:08 AM, Suprith T Jain 
>> wrote:
>> > Hi guys,
>> >
>> > I have a 3 node cluster and i am running a spark streaming job. consider
>> > the
>> > below example
>> >
>> > /*spark-submit* --master yarn-cluster --class
>> > com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint --jars
>> >
>> > /opt/client/Spark/spark/lib/streamingClient/kafka-clients-0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/kafka_2.10-0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar
>> > /opt/SparkStreamingExample-1.0.jar  /tmp/test 10 test
>> > 189.132.190.106:21005,189.132.190.145:21005,10.1.1.1:21005/
>> >
>> > In this case, suppose node 10.1.1.1 is down. Then for every window
>> > batch,
>> > spark tries to send a request  to all the nodes.
>> > This code is in the class org.apache.spark.streaming.kafka.KafkaCluster
>> >
>> > Function : getPartitionMetadata()
>> > Line : val resp: TopicMetadataResponse = consumer.send(req)
>> >
>> > The function getPartitionMetadata() is called from getPartitions() and
>> > findLeaders() which gets called for every batch.
>> >
>> > Hence, if the node is down, the connection fails and it wits till the
>> > timeout to happen before continuing which adds to the processing time.
>> >
>> > Question :
>> > Is there any way to avoid this ?
>> > In simple words, i do not want spark to send request to the node that is
>> > down for every batch. How can i achieve this ?

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark Kafka API tries connecting to dead node for every batch, which increases the processing time

2017-10-16 Thread Suprith T Jain
Yes I tried that. But it's not that effective.

In fact kafka SimpleConsumer tries to reconnect in case of socket error
(sendRequest method). So it ll always be twice the timeout for every window
and for every node that is down.


On 16-Oct-2017 7:34 PM, "Cody Koeninger"  wrote:

> Have you tried adjusting the timeout?
>
> On Mon, Oct 16, 2017 at 8:08 AM, Suprith T Jain 
> wrote:
> > Hi guys,
> >
> > I have a 3 node cluster and i am running a spark streaming job. consider
> the
> > below example
> >
> > /*spark-submit* --master yarn-cluster --class
> > com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint --jars
> > /opt/client/Spark/spark/lib/streamingClient/kafka-clients-
> 0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/
> kafka_2.10-0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/spark-
> streaming-kafka_2.10-1.5.1.jar
> > /opt/SparkStreamingExample-1.0.jar  /tmp/test 10 test
> > 189.132.190.106:21005,189.132.190.145:21005,10.1.1.1:21005/
> >
> > In this case, suppose node 10.1.1.1 is down. Then for every window batch,
> > spark tries to send a request  to all the nodes.
> > This code is in the class org.apache.spark.streaming.kafka.KafkaCluster
> >
> > Function : getPartitionMetadata()
> > Line : val resp: TopicMetadataResponse = consumer.send(req)
> >
> > The function getPartitionMetadata() is called from getPartitions() and
> > findLeaders() which gets called for every batch.
> >
> > Hence, if the node is down, the connection fails and it wits till the
> > timeout to happen before continuing which adds to the processing time.
> >
> > Question :
> > Is there any way to avoid this ?
> > In simple words, i do not want spark to send request to the node that is
> > down for every batch. How can i achieve this ?
>


Re: Spark Kafka API tries connecting to dead node for every batch, which increases the processing time

2017-10-16 Thread Cody Koeninger
Have you tried adjusting the timeout?

On Mon, Oct 16, 2017 at 8:08 AM, Suprith T Jain  wrote:
> Hi guys,
>
> I have a 3 node cluster and i am running a spark streaming job. consider the
> below example
>
> /*spark-submit* --master yarn-cluster --class
> com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint --jars
> /opt/client/Spark/spark/lib/streamingClient/kafka-clients-0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/kafka_2.10-0.8.2.1.jar,/opt/client/Spark/spark/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar
> /opt/SparkStreamingExample-1.0.jar  /tmp/test 10 test
> 189.132.190.106:21005,189.132.190.145:21005,10.1.1.1:21005/
>
> In this case, suppose node 10.1.1.1 is down. Then for every window batch,
> spark tries to send a request  to all the nodes.
> This code is in the class org.apache.spark.streaming.kafka.KafkaCluster
>
> Function : getPartitionMetadata()
> Line : val resp: TopicMetadataResponse = consumer.send(req)
>
> The function getPartitionMetadata() is called from getPartitions() and
> findLeaders() which gets called for every batch.
>
> Hence, if the node is down, the connection fails and it wits till the
> timeout to happen before continuing which adds to the processing time.
>
> Question :
> Is there any way to avoid this ?
> In simple words, i do not want spark to send request to the node that is
> down for every batch. How can i achieve this ?

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org