Re: Backpressure initial rate not working

2018-07-26 Thread Biplob Biswas
Hi Todd,

Thanks a lot, that works. Althouhg I am curious whether you know why the
initialRate setting not kicking in?

But for now the pipeline is usable again. Thanks a lot.

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 3:03 PM Todd Nist  wrote:

> Have you tried reducing the maxRatePerPartition to a lower value?  Based
> on your settings, I believe you are going to be able to pull *600K* worth
> of messages from Kafka, basically:
>
>   • maxRatePerPartition=15000
>
> • batchInterval 10s
>
> • 4 partitions on Ingest topic
>
>
> This results in a maximum ingest rate of 600K:
>
>
> • 4 * 10 * 15000 = 600,000 max
>
> Can you reduce the maxRatePerPartition to say 1500 for a test run?  That
> should result in a more manageable  batch and you can adjust from there.
>
>
> • 4 * 10 * 1500 = 60,000 max
>
> I know we are not setting the maxRate or initialRate, only the
> maxRatePerPartition and backpressure.enabled.  I thought that maxRate was
> not applicable when using back pressure, but may be mistaken.
>
>
> -Todd
>
>
>
>
>
>
> On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas 
> wrote:
>
>> Hi Todd,
>>
>> Thanks for the reply. I have the mayxRatePerPartition set as well. Below
>> is the spark submit config we used and still got the issue. Also the *batch
>> interval is set at 10s* and *number of partitions on the topic is set to
>> 4*  :
>>
>> spark2-submit --name "${YARN_NAME}" \
>>--master yarn \
>>--deploy-mode ${DEPLOY_MODE} \
>>--num-executors ${NUM_EXECUTORS} \
>>--driver-cores ${NUM_DRIVER_CORES} \
>>--executor-cores ${NUM_EXECUTOR_CORES} \
>>--driver-memory ${DRIVER_MEMORY} \
>>--executor-memory ${EXECUTOR_MEMORY} \
>>--queue ${YARN_QUEUE} \
>>--keytab ${KEYTAB}-yarn \
>>--principal ${PRINCIPAL} \
>>--conf "spark.yarn.preserve.staging.files=true" \
>>--conf "spark.yarn.submit.waitAppCompletion=false" \
>>--conf "spark.shuffle.service.enabled=true" \
>>--conf "spark.dynamicAllocation.enabled=true" \
>>--conf "spark.dynamicAllocation.minExecutors=1" \
>>--conf "spark.streaming.backpressure.enabled=true" \
>>--conf "spark.streaming.receiver.maxRate=15000" \
>>--conf "spark.streaming.kafka.maxRatePerPartition=15000" \
>>--conf "spark.streaming.backpressure.initialRate=2000" \
>>--conf 
>> "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>>  \
>>--driver-class-path 
>> "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>>  \
>>--driver-java-options "-Djava.security.auth.login.config=./jaas.conf 
>> -Dlog4j.configuration=log4j-spark.properties" \
>>--conf 
>> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf
>>  -Dlog4j.configuration=log4j-spark.properties" \
>>--files "${JAAS_CONF},${KEYTAB}" \
>>--class "${MAIN_CLASS}" \
>>"${ARTIFACT_FILE}"
>>
>>
>> The first batch is huge, even if it worked for the first batch I would've
>> tried researching more. The problem is that the first batch is more than
>> 500k records.
>>
>> Thanks & Regards
>> Biplob Biswas
>>
>>
>> On Thu, Jul 26, 2018 at 2:33 PM Todd Nist  wrote:
>>
>>> Hi Biplob,
>>>
>>> How many partitions are on the topic you are reading from and have you
>>> set the maxRatePerPartition?  iirc, spark back pressure is calculated as
>>> follows:
>>>
>>> *Spark back pressure:*
>>>
>>> Back pressure is calculated off of the following:
>>>
>>>
>>> • maxRatePerPartition=200
>>>
>>> • batchInterval 30s
>>>
>>> • 3 partitions on Ingest topic
>>>
>>>
>>> This results in a maximum ingest rate of 18K:
>>>
>>>
>>> • 3 * 30 * 200 = 18 max
>>>
>>> The spark.streaming.backpressure.initialRate only applies to the first
>>> batch, per docs:
>>>
>>>
>>> This is the initial maximum receiving rate at which each receiver will
 receive data for the *first batch* when the backpressure mechanism is
 enabled.
>>>
>>>
>>> If you  set the maxRatePerPartition and apply the above formula, I
>>> believe you will be able to achieve the results you are looking for.
>>>
>>> HTH.
>>>
>>> -Todd
>>>
>>>
>>> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas 
>>> wrote:
>>>
 Did anyone face similar issue? and any viable way to solve this?
 Thanks & Regards
 Biplob Biswas


 On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas 
 wrote:

> I have enabled the spark.streaming.backpressure.enabled setting and
> also set spark.streaming.backpressure.initialRate  to 15000, but my
> spark job is not respecting these settings when reading from Kafka after a
> failure.
>
> In my kafka topic around 500k records are waiting for being processed
> and they are all taken in 1 huge batch which ultimately takes a long time
> and fails with executor failure exception. We don't have more resources to
> give in our test cluster and we expect the backpressure to kick in and 
> take
> smaller batches.

Re: Backpressure initial rate not working

2018-07-26 Thread Todd Nist
Have you tried reducing the maxRatePerPartition to a lower value?  Based on
your settings, I believe you are going to be able to pull *600K* worth of
messages from Kafka, basically:

  • maxRatePerPartition=15000

• batchInterval 10s

• 4 partitions on Ingest topic


This results in a maximum ingest rate of 600K:


• 4 * 10 * 15000 = 600,000 max

Can you reduce the maxRatePerPartition to say 1500 for a test run?  That
should result in a more manageable  batch and you can adjust from there.


• 4 * 10 * 1500 = 60,000 max

I know we are not setting the maxRate or initialRate, only the
maxRatePerPartition and backpressure.enabled.  I thought that maxRate was
not applicable when using back pressure, but may be mistaken.


-Todd






On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas 
wrote:

> Hi Todd,
>
> Thanks for the reply. I have the mayxRatePerPartition set as well. Below
> is the spark submit config we used and still got the issue. Also the *batch
> interval is set at 10s* and *number of partitions on the topic is set to
> 4*  :
>
> spark2-submit --name "${YARN_NAME}" \
>--master yarn \
>--deploy-mode ${DEPLOY_MODE} \
>--num-executors ${NUM_EXECUTORS} \
>--driver-cores ${NUM_DRIVER_CORES} \
>--executor-cores ${NUM_EXECUTOR_CORES} \
>--driver-memory ${DRIVER_MEMORY} \
>--executor-memory ${EXECUTOR_MEMORY} \
>--queue ${YARN_QUEUE} \
>--keytab ${KEYTAB}-yarn \
>--principal ${PRINCIPAL} \
>--conf "spark.yarn.preserve.staging.files=true" \
>--conf "spark.yarn.submit.waitAppCompletion=false" \
>--conf "spark.shuffle.service.enabled=true" \
>--conf "spark.dynamicAllocation.enabled=true" \
>--conf "spark.dynamicAllocation.minExecutors=1" \
>--conf "spark.streaming.backpressure.enabled=true" \
>--conf "spark.streaming.receiver.maxRate=15000" \
>--conf "spark.streaming.kafka.maxRatePerPartition=15000" \
>--conf "spark.streaming.backpressure.initialRate=2000" \
>--conf 
> "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>  \
>--driver-class-path 
> "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>  \
>--driver-java-options "-Djava.security.auth.login.config=./jaas.conf 
> -Dlog4j.configuration=log4j-spark.properties" \
>--conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf
>  -Dlog4j.configuration=log4j-spark.properties" \
>--files "${JAAS_CONF},${KEYTAB}" \
>--class "${MAIN_CLASS}" \
>"${ARTIFACT_FILE}"
>
>
> The first batch is huge, even if it worked for the first batch I would've
> tried researching more. The problem is that the first batch is more than
> 500k records.
>
> Thanks & Regards
> Biplob Biswas
>
>
> On Thu, Jul 26, 2018 at 2:33 PM Todd Nist  wrote:
>
>> Hi Biplob,
>>
>> How many partitions are on the topic you are reading from and have you
>> set the maxRatePerPartition?  iirc, spark back pressure is calculated as
>> follows:
>>
>> *Spark back pressure:*
>>
>> Back pressure is calculated off of the following:
>>
>>
>> • maxRatePerPartition=200
>>
>> • batchInterval 30s
>>
>> • 3 partitions on Ingest topic
>>
>>
>> This results in a maximum ingest rate of 18K:
>>
>>
>> • 3 * 30 * 200 = 18 max
>>
>> The spark.streaming.backpressure.initialRate only applies to the first
>> batch, per docs:
>>
>>
>> This is the initial maximum receiving rate at which each receiver will
>>> receive data for the *first batch* when the backpressure mechanism is
>>> enabled.
>>
>>
>> If you  set the maxRatePerPartition and apply the above formula, I
>> believe you will be able to achieve the results you are looking for.
>>
>> HTH.
>>
>> -Todd
>>
>>
>> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas 
>> wrote:
>>
>>> Did anyone face similar issue? and any viable way to solve this?
>>> Thanks & Regards
>>> Biplob Biswas
>>>
>>>
>>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas 
>>> wrote:
>>>
 I have enabled the spark.streaming.backpressure.enabled setting and
 also set spark.streaming.backpressure.initialRate  to 15000, but my
 spark job is not respecting these settings when reading from Kafka after a
 failure.

 In my kafka topic around 500k records are waiting for being processed
 and they are all taken in 1 huge batch which ultimately takes a long time
 and fails with executor failure exception. We don't have more resources to
 give in our test cluster and we expect the backpressure to kick in and take
 smaller batches.

 What can I be doing wrong?


 Thanks & Regards
 Biplob Biswas

>>>


Re: Backpressure initial rate not working

2018-07-26 Thread Biplob Biswas
Hi Todd,

Thanks for the reply. I have the mayxRatePerPartition set as well. Below is
the spark submit config we used and still got the issue. Also the *batch
interval is set at 10s* and *number of partitions on the topic is set to 4*
:

spark2-submit --name "${YARN_NAME}" \
   --master yarn \
   --deploy-mode ${DEPLOY_MODE} \
   --num-executors ${NUM_EXECUTORS} \
   --driver-cores ${NUM_DRIVER_CORES} \
   --executor-cores ${NUM_EXECUTOR_CORES} \
   --driver-memory ${DRIVER_MEMORY} \
   --executor-memory ${EXECUTOR_MEMORY} \
   --queue ${YARN_QUEUE} \
   --keytab ${KEYTAB}-yarn \
   --principal ${PRINCIPAL} \
   --conf "spark.yarn.preserve.staging.files=true" \
   --conf "spark.yarn.submit.waitAppCompletion=false" \
   --conf "spark.shuffle.service.enabled=true" \
   --conf "spark.dynamicAllocation.enabled=true" \
   --conf "spark.dynamicAllocation.minExecutors=1" \
   --conf "spark.streaming.backpressure.enabled=true" \
   --conf "spark.streaming.receiver.maxRate=15000" \
   --conf "spark.streaming.kafka.maxRatePerPartition=15000" \
   --conf "spark.streaming.backpressure.initialRate=2000" \
   --conf 
"spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
\
   --driver-class-path
"/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
\
   --driver-java-options
"-Djava.security.auth.login.config=./jaas.conf
-Dlog4j.configuration=log4j-spark.properties" \
   --conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf
-Dlog4j.configuration=log4j-spark.properties" \
   --files "${JAAS_CONF},${KEYTAB}" \
   --class "${MAIN_CLASS}" \
   "${ARTIFACT_FILE}"


The first batch is huge, even if it worked for the first batch I would've
tried researching more. The problem is that the first batch is more than
500k records.

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 2:33 PM Todd Nist  wrote:

> Hi Biplob,
>
> How many partitions are on the topic you are reading from and have you set
> the maxRatePerPartition?  iirc, spark back pressure is calculated as
> follows:
>
> *Spark back pressure:*
>
> Back pressure is calculated off of the following:
>
>
> • maxRatePerPartition=200
>
> • batchInterval 30s
>
> • 3 partitions on Ingest topic
>
>
> This results in a maximum ingest rate of 18K:
>
>
> • 3 * 30 * 200 = 18 max
>
> The spark.streaming.backpressure.initialRate only applies to the first
> batch, per docs:
>
>
> This is the initial maximum receiving rate at which each receiver will
>> receive data for the *first batch* when the backpressure mechanism is
>> enabled.
>
>
> If you  set the maxRatePerPartition and apply the above formula, I believe
> you will be able to achieve the results you are looking for.
>
> HTH.
>
> -Todd
>
>
> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas 
> wrote:
>
>> Did anyone face similar issue? and any viable way to solve this?
>> Thanks & Regards
>> Biplob Biswas
>>
>>
>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas 
>> wrote:
>>
>>> I have enabled the spark.streaming.backpressure.enabled setting and also
>>>  set spark.streaming.backpressure.initialRate  to 15000, but my spark
>>> job is not respecting these settings when reading from Kafka after a
>>> failure.
>>>
>>> In my kafka topic around 500k records are waiting for being processed
>>> and they are all taken in 1 huge batch which ultimately takes a long time
>>> and fails with executor failure exception. We don't have more resources to
>>> give in our test cluster and we expect the backpressure to kick in and take
>>> smaller batches.
>>>
>>> What can I be doing wrong?
>>>
>>>
>>> Thanks & Regards
>>> Biplob Biswas
>>>
>>


Re: Backpressure initial rate not working

2018-07-26 Thread Todd Nist
Hi Biplob,

How many partitions are on the topic you are reading from and have you set
the maxRatePerPartition?  iirc, spark back pressure is calculated as
follows:

*Spark back pressure:*

Back pressure is calculated off of the following:


• maxRatePerPartition=200

• batchInterval 30s

• 3 partitions on Ingest topic


This results in a maximum ingest rate of 18K:


• 3 * 30 * 200 = 18 max

The spark.streaming.backpressure.initialRate only applies to the first
batch, per docs:


This is the initial maximum receiving rate at which each receiver will
> receive data for the *first batch* when the backpressure mechanism is
> enabled.


If you  set the maxRatePerPartition and apply the above formula, I believe
you will be able to achieve the results you are looking for.

HTH.

-Todd


On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas 
wrote:

> Did anyone face similar issue? and any viable way to solve this?
> Thanks & Regards
> Biplob Biswas
>
>
> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas 
> wrote:
>
>> I have enabled the spark.streaming.backpressure.enabled setting and also
>>  set spark.streaming.backpressure.initialRate  to 15000, but my spark
>> job is not respecting these settings when reading from Kafka after a
>> failure.
>>
>> In my kafka topic around 500k records are waiting for being processed and
>> they are all taken in 1 huge batch which ultimately takes a long time and
>> fails with executor failure exception. We don't have more resources to give
>> in our test cluster and we expect the backpressure to kick in and take
>> smaller batches.
>>
>> What can I be doing wrong?
>>
>>
>> Thanks & Regards
>> Biplob Biswas
>>
>


Re: Backpressure initial rate not working

2018-07-26 Thread Biplob Biswas
Did anyone face similar issue? and any viable way to solve this?
Thanks & Regards
Biplob Biswas


On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas 
wrote:

> I have enabled the spark.streaming.backpressure.enabled setting and also
>  set spark.streaming.backpressure.initialRate  to 15000, but my spark job
> is not respecting these settings when reading from Kafka after a failure.
>
> In my kafka topic around 500k records are waiting for being processed and
> they are all taken in 1 huge batch which ultimately takes a long time and
> fails with executor failure exception. We don't have more resources to give
> in our test cluster and we expect the backpressure to kick in and take
> smaller batches.
>
> What can I be doing wrong?
>
>
> Thanks & Regards
> Biplob Biswas
>