Hi Todd, Thanks for the reply. I have the mayxRatePerPartition set as well. Below is the spark submit config we used and still got the issue. Also the *batch interval is set at 10s* and *number of partitions on the topic is set to 4* :
spark2-submit --name "${YARN_NAME}" \ --master yarn \ --deploy-mode ${DEPLOY_MODE} \ --num-executors ${NUM_EXECUTORS} \ --driver-cores ${NUM_DRIVER_CORES} \ --executor-cores ${NUM_EXECUTOR_CORES} \ --driver-memory ${DRIVER_MEMORY} \ --executor-memory ${EXECUTOR_MEMORY} \ --queue ${YARN_QUEUE} \ --keytab ${KEYTAB}-yarn \ --principal ${PRINCIPAL} \ --conf "spark.yarn.preserve.staging.files=true" \ --conf "spark.yarn.submit.waitAppCompletion=false" \ --conf "spark.shuffle.service.enabled=true" \ --conf "spark.dynamicAllocation.enabled=true" \ --conf "spark.dynamicAllocation.minExecutors=1" \ --conf "spark.streaming.backpressure.enabled=true" \ --conf "spark.streaming.receiver.maxRate=15000" \ --conf "spark.streaming.kafka.maxRatePerPartition=15000" \ --conf "spark.streaming.backpressure.initialRate=2000" \ --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \ --driver-class-path "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \ --driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \ --files "${JAAS_CONF},${KEYTAB}" \ --class "${MAIN_CLASS}" \ "${ARTIFACT_FILE}" The first batch is huge, even if it worked for the first batch I would've tried researching more. The problem is that the first batch is more than 500k records. Thanks & Regards Biplob Biswas On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <tsind...@gmail.com> wrote: > Hi Biplob, > > How many partitions are on the topic you are reading from and have you set > the maxRatePerPartition? iirc, spark back pressure is calculated as > follows: > > *Spark back pressure:* > > Back pressure is calculated off of the following: > > > • maxRatePerPartition=200 > > • batchInterval 30s > > • 3 partitions on Ingest topic > > > This results in a maximum ingest rate of 18K: > > > • 3 * 30 * 200 = 180000 max > > The spark.streaming.backpressure.initialRate only applies to the first > batch, per docs: > > > This is the initial maximum receiving rate at which each receiver will >> receive data for the *first batch* when the backpressure mechanism is >> enabled. > > > If you set the maxRatePerPartition and apply the above formula, I believe > you will be able to achieve the results you are looking for. > > HTH. > > -Todd > > > On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> Did anyone face similar issue? and any viable way to solve this? >> Thanks & Regards >> Biplob Biswas >> >> >> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <revolutioni...@gmail.com> >> wrote: >> >>> I have enabled the spark.streaming.backpressure.enabled setting and also >>> set spark.streaming.backpressure.initialRate to 15000, but my spark >>> job is not respecting these settings when reading from Kafka after a >>> failure. >>> >>> In my kafka topic around 500k records are waiting for being processed >>> and they are all taken in 1 huge batch which ultimately takes a long time >>> and fails with executor failure exception. We don't have more resources to >>> give in our test cluster and we expect the backpressure to kick in and take >>> smaller batches. >>> >>> What can I be doing wrong? >>> >>> >>> Thanks & Regards >>> Biplob Biswas >>> >>