Hi Todd, Thanks a lot, that works. Althouhg I am curious whether you know why the initialRate setting not kicking in?
But for now the pipeline is usable again. Thanks a lot. Thanks & Regards Biplob Biswas On Thu, Jul 26, 2018 at 3:03 PM Todd Nist <tsind...@gmail.com> wrote: > Have you tried reducing the maxRatePerPartition to a lower value? Based > on your settings, I believe you are going to be able to pull *600K* worth > of messages from Kafka, basically: > > • maxRatePerPartition=15000 > > • batchInterval 10s > > • 4 partitions on Ingest topic > > > This results in a maximum ingest rate of 600K: > > > • 4 * 10 * 15000 = 600,000 max > > Can you reduce the maxRatePerPartition to say 1500 for a test run? That > should result in a more manageable batch and you can adjust from there. > > > • 4 * 10 * 1500 = 60,000 max > > I know we are not setting the maxRate or initialRate, only the > maxRatePerPartition and backpressure.enabled. I thought that maxRate was > not applicable when using back pressure, but may be mistaken. > > > -Todd > > > > > > > On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> Hi Todd, >> >> Thanks for the reply. I have the mayxRatePerPartition set as well. Below >> is the spark submit config we used and still got the issue. Also the *batch >> interval is set at 10s* and *number of partitions on the topic is set to >> 4* : >> >> spark2-submit --name "${YARN_NAME}" \ >> --master yarn \ >> --deploy-mode ${DEPLOY_MODE} \ >> --num-executors ${NUM_EXECUTORS} \ >> --driver-cores ${NUM_DRIVER_CORES} \ >> --executor-cores ${NUM_EXECUTOR_CORES} \ >> --driver-memory ${DRIVER_MEMORY} \ >> --executor-memory ${EXECUTOR_MEMORY} \ >> --queue ${YARN_QUEUE} \ >> --keytab ${KEYTAB}-yarn \ >> --principal ${PRINCIPAL} \ >> --conf "spark.yarn.preserve.staging.files=true" \ >> --conf "spark.yarn.submit.waitAppCompletion=false" \ >> --conf "spark.shuffle.service.enabled=true" \ >> --conf "spark.dynamicAllocation.enabled=true" \ >> --conf "spark.dynamicAllocation.minExecutors=1" \ >> --conf "spark.streaming.backpressure.enabled=true" \ >> --conf "spark.streaming.receiver.maxRate=15000" \ >> --conf "spark.streaming.kafka.maxRatePerPartition=15000" \ >> --conf "spark.streaming.backpressure.initialRate=2000" \ >> --conf >> "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" >> \ >> --driver-class-path >> "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" >> \ >> --driver-java-options "-Djava.security.auth.login.config=./jaas.conf >> -Dlog4j.configuration=log4j-spark.properties" \ >> --conf >> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf >> -Dlog4j.configuration=log4j-spark.properties" \ >> --files "${JAAS_CONF},${KEYTAB}" \ >> --class "${MAIN_CLASS}" \ >> "${ARTIFACT_FILE}" >> >> >> The first batch is huge, even if it worked for the first batch I would've >> tried researching more. The problem is that the first batch is more than >> 500k records. >> >> Thanks & Regards >> Biplob Biswas >> >> >> On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <tsind...@gmail.com> wrote: >> >>> Hi Biplob, >>> >>> How many partitions are on the topic you are reading from and have you >>> set the maxRatePerPartition? iirc, spark back pressure is calculated as >>> follows: >>> >>> *Spark back pressure:* >>> >>> Back pressure is calculated off of the following: >>> >>> >>> • maxRatePerPartition=200 >>> >>> • batchInterval 30s >>> >>> • 3 partitions on Ingest topic >>> >>> >>> This results in a maximum ingest rate of 18K: >>> >>> >>> • 3 * 30 * 200 = 180000 max >>> >>> The spark.streaming.backpressure.initialRate only applies to the first >>> batch, per docs: >>> >>> >>> This is the initial maximum receiving rate at which each receiver will >>>> receive data for the *first batch* when the backpressure mechanism is >>>> enabled. >>> >>> >>> If you set the maxRatePerPartition and apply the above formula, I >>> believe you will be able to achieve the results you are looking for. >>> >>> HTH. >>> >>> -Todd >>> >>> >>> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <revolutioni...@gmail.com> >>> wrote: >>> >>>> Did anyone face similar issue? and any viable way to solve this? >>>> Thanks & Regards >>>> Biplob Biswas >>>> >>>> >>>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <revolutioni...@gmail.com> >>>> wrote: >>>> >>>>> I have enabled the spark.streaming.backpressure.enabled setting and >>>>> also set spark.streaming.backpressure.initialRate to 15000, but my >>>>> spark job is not respecting these settings when reading from Kafka after a >>>>> failure. >>>>> >>>>> In my kafka topic around 500k records are waiting for being processed >>>>> and they are all taken in 1 huge batch which ultimately takes a long time >>>>> and fails with executor failure exception. We don't have more resources to >>>>> give in our test cluster and we expect the backpressure to kick in and >>>>> take >>>>> smaller batches. >>>>> >>>>> What can I be doing wrong? >>>>> >>>>> >>>>> Thanks & Regards >>>>> Biplob Biswas >>>>> >>>>