Hi Todd,

Thanks a lot, that works. Althouhg I am curious whether you know why the
initialRate setting not kicking in?

But for now the pipeline is usable again. Thanks a lot.

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 3:03 PM Todd Nist <tsind...@gmail.com> wrote:

> Have you tried reducing the maxRatePerPartition to a lower value?  Based
> on your settings, I believe you are going to be able to pull *600K* worth
> of messages from Kafka, basically:
>
>   • maxRatePerPartition=15000
>
> • batchInterval 10s
>
> • 4 partitions on Ingest topic
>
>
> This results in a maximum ingest rate of 600K:
>
>
> • 4 * 10 * 15000 = 600,000 max
>
> Can you reduce the maxRatePerPartition to say 1500 for a test run?  That
> should result in a more manageable  batch and you can adjust from there.
>
>
> • 4 * 10 * 1500 = 60,000 max
>
> I know we are not setting the maxRate or initialRate, only the
> maxRatePerPartition and backpressure.enabled.  I thought that maxRate was
> not applicable when using back pressure, but may be mistaken.
>
>
> -Todd
>
>
>
>
>
>
> On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas <revolutioni...@gmail.com>
> wrote:
>
>> Hi Todd,
>>
>> Thanks for the reply. I have the mayxRatePerPartition set as well. Below
>> is the spark submit config we used and still got the issue. Also the *batch
>> interval is set at 10s* and *number of partitions on the topic is set to
>> 4*  :
>>
>> spark2-submit --name "${YARN_NAME}" \
>>    --master yarn \
>>    --deploy-mode ${DEPLOY_MODE} \
>>    --num-executors ${NUM_EXECUTORS} \
>>    --driver-cores ${NUM_DRIVER_CORES} \
>>    --executor-cores ${NUM_EXECUTOR_CORES} \
>>    --driver-memory ${DRIVER_MEMORY} \
>>    --executor-memory ${EXECUTOR_MEMORY} \
>>    --queue ${YARN_QUEUE} \
>>    --keytab ${KEYTAB}-yarn \
>>    --principal ${PRINCIPAL} \
>>    --conf "spark.yarn.preserve.staging.files=true" \
>>    --conf "spark.yarn.submit.waitAppCompletion=false" \
>>    --conf "spark.shuffle.service.enabled=true" \
>>    --conf "spark.dynamicAllocation.enabled=true" \
>>    --conf "spark.dynamicAllocation.minExecutors=1" \
>>    --conf "spark.streaming.backpressure.enabled=true" \
>>    --conf "spark.streaming.receiver.maxRate=15000" \
>>    --conf "spark.streaming.kafka.maxRatePerPartition=15000" \
>>    --conf "spark.streaming.backpressure.initialRate=2000" \
>>    --conf 
>> "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>>  \
>>    --driver-class-path 
>> "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>>  \
>>    --driver-java-options "-Djava.security.auth.login.config=./jaas.conf 
>> -Dlog4j.configuration=log4j-spark.properties" \
>>    --conf 
>> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf
>>  -Dlog4j.configuration=log4j-spark.properties" \
>>    --files "${JAAS_CONF},${KEYTAB}" \
>>    --class "${MAIN_CLASS}" \
>>    "${ARTIFACT_FILE}"
>>
>>
>> The first batch is huge, even if it worked for the first batch I would've
>> tried researching more. The problem is that the first batch is more than
>> 500k records.
>>
>> Thanks & Regards
>> Biplob Biswas
>>
>>
>> On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <tsind...@gmail.com> wrote:
>>
>>> Hi Biplob,
>>>
>>> How many partitions are on the topic you are reading from and have you
>>> set the maxRatePerPartition?  iirc, spark back pressure is calculated as
>>> follows:
>>>
>>> *Spark back pressure:*
>>>
>>> Back pressure is calculated off of the following:
>>>
>>>
>>> • maxRatePerPartition=200
>>>
>>> • batchInterval 30s
>>>
>>> • 3 partitions on Ingest topic
>>>
>>>
>>> This results in a maximum ingest rate of 18K:
>>>
>>>
>>> • 3 * 30 * 200 = 180000 max
>>>
>>> The spark.streaming.backpressure.initialRate only applies to the first
>>> batch, per docs:
>>>
>>>
>>> This is the initial maximum receiving rate at which each receiver will
>>>> receive data for the *first batch* when the backpressure mechanism is
>>>> enabled.
>>>
>>>
>>> If you  set the maxRatePerPartition and apply the above formula, I
>>> believe you will be able to achieve the results you are looking for.
>>>
>>> HTH.
>>>
>>> -Todd
>>>
>>>
>>> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <revolutioni...@gmail.com>
>>> wrote:
>>>
>>>> Did anyone face similar issue? and any viable way to solve this?
>>>> Thanks & Regards
>>>> Biplob Biswas
>>>>
>>>>
>>>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <revolutioni...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have enabled the spark.streaming.backpressure.enabled setting and
>>>>> also set spark.streaming.backpressure.initialRate  to 15000, but my
>>>>> spark job is not respecting these settings when reading from Kafka after a
>>>>> failure.
>>>>>
>>>>> In my kafka topic around 500k records are waiting for being processed
>>>>> and they are all taken in 1 huge batch which ultimately takes a long time
>>>>> and fails with executor failure exception. We don't have more resources to
>>>>> give in our test cluster and we expect the backpressure to kick in and 
>>>>> take
>>>>> smaller batches.
>>>>>
>>>>> What can I be doing wrong?
>>>>>
>>>>>
>>>>> Thanks & Regards
>>>>> Biplob Biswas
>>>>>
>>>>

Reply via email to