Hi Todd,

Thanks for the reply. I have the mayxRatePerPartition set as well. Below is
the spark submit config we used and still got the issue. Also the *batch
interval is set at 10s* and *number of partitions on the topic is set to 4*
:

spark2-submit --name "${YARN_NAME}" \
   --master yarn \
   --deploy-mode ${DEPLOY_MODE} \
   --num-executors ${NUM_EXECUTORS} \
   --driver-cores ${NUM_DRIVER_CORES} \
   --executor-cores ${NUM_EXECUTOR_CORES} \
   --driver-memory ${DRIVER_MEMORY} \
   --executor-memory ${EXECUTOR_MEMORY} \
   --queue ${YARN_QUEUE} \
   --keytab ${KEYTAB}-yarn \
   --principal ${PRINCIPAL} \
   --conf "spark.yarn.preserve.staging.files=true" \
   --conf "spark.yarn.submit.waitAppCompletion=false" \
   --conf "spark.shuffle.service.enabled=true" \
   --conf "spark.dynamicAllocation.enabled=true" \
   --conf "spark.dynamicAllocation.minExecutors=1" \
   --conf "spark.streaming.backpressure.enabled=true" \
   --conf "spark.streaming.receiver.maxRate=15000" \
   --conf "spark.streaming.kafka.maxRatePerPartition=15000" \
   --conf "spark.streaming.backpressure.initialRate=2000" \
   --conf 
"spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
\
   --driver-class-path
"/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
\
   --driver-java-options
"-Djava.security.auth.login.config=./jaas.conf
-Dlog4j.configuration=log4j-spark.properties" \
   --conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf
-Dlog4j.configuration=log4j-spark.properties" \
   --files "${JAAS_CONF},${KEYTAB}" \
   --class "${MAIN_CLASS}" \
   "${ARTIFACT_FILE}"


The first batch is huge, even if it worked for the first batch I would've
tried researching more. The problem is that the first batch is more than
500k records.

Thanks & Regards
Biplob Biswas


On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <tsind...@gmail.com> wrote:

> Hi Biplob,
>
> How many partitions are on the topic you are reading from and have you set
> the maxRatePerPartition?  iirc, spark back pressure is calculated as
> follows:
>
> *Spark back pressure:*
>
> Back pressure is calculated off of the following:
>
>
> • maxRatePerPartition=200
>
> • batchInterval 30s
>
> • 3 partitions on Ingest topic
>
>
> This results in a maximum ingest rate of 18K:
>
>
> • 3 * 30 * 200 = 180000 max
>
> The spark.streaming.backpressure.initialRate only applies to the first
> batch, per docs:
>
>
> This is the initial maximum receiving rate at which each receiver will
>> receive data for the *first batch* when the backpressure mechanism is
>> enabled.
>
>
> If you  set the maxRatePerPartition and apply the above formula, I believe
> you will be able to achieve the results you are looking for.
>
> HTH.
>
> -Todd
>
>
> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <revolutioni...@gmail.com>
> wrote:
>
>> Did anyone face similar issue? and any viable way to solve this?
>> Thanks & Regards
>> Biplob Biswas
>>
>>
>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <revolutioni...@gmail.com>
>> wrote:
>>
>>> I have enabled the spark.streaming.backpressure.enabled setting and also
>>>  set spark.streaming.backpressure.initialRate  to 15000, but my spark
>>> job is not respecting these settings when reading from Kafka after a
>>> failure.
>>>
>>> In my kafka topic around 500k records are waiting for being processed
>>> and they are all taken in 1 huge batch which ultimately takes a long time
>>> and fails with executor failure exception. We don't have more resources to
>>> give in our test cluster and we expect the backpressure to kick in and take
>>> smaller batches.
>>>
>>> What can I be doing wrong?
>>>
>>>
>>> Thanks & Regards
>>> Biplob Biswas
>>>
>>

Reply via email to