Re: Backpressure initial rate not working
Hi Todd, Thanks a lot, that works. Althouhg I am curious whether you know why the initialRate setting not kicking in? But for now the pipeline is usable again. Thanks a lot. Thanks & Regards Biplob Biswas On Thu, Jul 26, 2018 at 3:03 PM Todd Nist wrote: > Have you tried reducing the maxRatePerPartition to a lower value? Based > on your settings, I believe you are going to be able to pull *600K* worth > of messages from Kafka, basically: > > • maxRatePerPartition=15000 > > • batchInterval 10s > > • 4 partitions on Ingest topic > > > This results in a maximum ingest rate of 600K: > > > • 4 * 10 * 15000 = 600,000 max > > Can you reduce the maxRatePerPartition to say 1500 for a test run? That > should result in a more manageable batch and you can adjust from there. > > > • 4 * 10 * 1500 = 60,000 max > > I know we are not setting the maxRate or initialRate, only the > maxRatePerPartition and backpressure.enabled. I thought that maxRate was > not applicable when using back pressure, but may be mistaken. > > > -Todd > > > > > > > On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas > wrote: > >> Hi Todd, >> >> Thanks for the reply. I have the mayxRatePerPartition set as well. Below >> is the spark submit config we used and still got the issue. Also the *batch >> interval is set at 10s* and *number of partitions on the topic is set to >> 4* : >> >> spark2-submit --name "${YARN_NAME}" \ >>--master yarn \ >>--deploy-mode ${DEPLOY_MODE} \ >>--num-executors ${NUM_EXECUTORS} \ >>--driver-cores ${NUM_DRIVER_CORES} \ >>--executor-cores ${NUM_EXECUTOR_CORES} \ >>--driver-memory ${DRIVER_MEMORY} \ >>--executor-memory ${EXECUTOR_MEMORY} \ >>--queue ${YARN_QUEUE} \ >>--keytab ${KEYTAB}-yarn \ >>--principal ${PRINCIPAL} \ >>--conf "spark.yarn.preserve.staging.files=true" \ >>--conf "spark.yarn.submit.waitAppCompletion=false" \ >>--conf "spark.shuffle.service.enabled=true" \ >>--conf "spark.dynamicAllocation.enabled=true" \ >>--conf "spark.dynamicAllocation.minExecutors=1" \ >>--conf "spark.streaming.backpressure.enabled=true" \ >>--conf "spark.streaming.receiver.maxRate=15000" \ >>--conf "spark.streaming.kafka.maxRatePerPartition=15000" \ >>--conf "spark.streaming.backpressure.initialRate=2000" \ >>--conf >> "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" >> \ >>--driver-class-path >> "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" >> \ >>--driver-java-options "-Djava.security.auth.login.config=./jaas.conf >> -Dlog4j.configuration=log4j-spark.properties" \ >>--conf >> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf >> -Dlog4j.configuration=log4j-spark.properties" \ >>--files "${JAAS_CONF},${KEYTAB}" \ >>--class "${MAIN_CLASS}" \ >>"${ARTIFACT_FILE}" >> >> >> The first batch is huge, even if it worked for the first batch I would've >> tried researching more. The problem is that the first batch is more than >> 500k records. >> >> Thanks & Regards >> Biplob Biswas >> >> >> On Thu, Jul 26, 2018 at 2:33 PM Todd Nist wrote: >> >>> Hi Biplob, >>> >>> How many partitions are on the topic you are reading from and have you >>> set the maxRatePerPartition? iirc, spark back pressure is calculated as >>> follows: >>> >>> *Spark back pressure:* >>> >>> Back pressure is calculated off of the following: >>> >>> >>> • maxRatePerPartition=200 >>> >>> • batchInterval 30s >>> >>> • 3 partitions on Ingest topic >>> >>> >>> This results in a maximum ingest rate of 18K: >>> >>> >>> • 3 * 30 * 200 = 18 max >>> >>> The spark.streaming.backpressure.initialRate only applies to the first >>> batch, per docs: >>> >>> >>> This is the initial maximum receiving rate at which each receiver will receive data for the *first batch* when the backpressure mechanism is enabled. >>> >>> >>> If you set the maxRatePerPartition and apply the above formula, I >>> believe you will be able to achieve the results you are looking for. >>> >>> HTH. >>> >>> -Todd >>> >>> >>> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas >>> wrote: >>> Did anyone face similar issue? and any viable way to solve this? Thanks & Regards Biplob Biswas On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas wrote: > I have enabled the spark.streaming.backpressure.enabled setting and > also set spark.streaming.backpressure.initialRate to 15000, but my > spark job is not respecting these settings when reading from Kafka after a > failure. > > In my kafka topic around 500k records are waiting for being processed > and they are all taken in 1 huge batch which ultimately takes a long time > and fails with executor failure exception. We don't have more resources to > give in our test cluster and we expect the backpressure to kick in and > take > smaller batches.
Re: Backpressure initial rate not working
Have you tried reducing the maxRatePerPartition to a lower value? Based on your settings, I believe you are going to be able to pull *600K* worth of messages from Kafka, basically: • maxRatePerPartition=15000 • batchInterval 10s • 4 partitions on Ingest topic This results in a maximum ingest rate of 600K: • 4 * 10 * 15000 = 600,000 max Can you reduce the maxRatePerPartition to say 1500 for a test run? That should result in a more manageable batch and you can adjust from there. • 4 * 10 * 1500 = 60,000 max I know we are not setting the maxRate or initialRate, only the maxRatePerPartition and backpressure.enabled. I thought that maxRate was not applicable when using back pressure, but may be mistaken. -Todd On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas wrote: > Hi Todd, > > Thanks for the reply. I have the mayxRatePerPartition set as well. Below > is the spark submit config we used and still got the issue. Also the *batch > interval is set at 10s* and *number of partitions on the topic is set to > 4* : > > spark2-submit --name "${YARN_NAME}" \ >--master yarn \ >--deploy-mode ${DEPLOY_MODE} \ >--num-executors ${NUM_EXECUTORS} \ >--driver-cores ${NUM_DRIVER_CORES} \ >--executor-cores ${NUM_EXECUTOR_CORES} \ >--driver-memory ${DRIVER_MEMORY} \ >--executor-memory ${EXECUTOR_MEMORY} \ >--queue ${YARN_QUEUE} \ >--keytab ${KEYTAB}-yarn \ >--principal ${PRINCIPAL} \ >--conf "spark.yarn.preserve.staging.files=true" \ >--conf "spark.yarn.submit.waitAppCompletion=false" \ >--conf "spark.shuffle.service.enabled=true" \ >--conf "spark.dynamicAllocation.enabled=true" \ >--conf "spark.dynamicAllocation.minExecutors=1" \ >--conf "spark.streaming.backpressure.enabled=true" \ >--conf "spark.streaming.receiver.maxRate=15000" \ >--conf "spark.streaming.kafka.maxRatePerPartition=15000" \ >--conf "spark.streaming.backpressure.initialRate=2000" \ >--conf > "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" > \ >--driver-class-path > "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" > \ >--driver-java-options "-Djava.security.auth.login.config=./jaas.conf > -Dlog4j.configuration=log4j-spark.properties" \ >--conf > "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf > -Dlog4j.configuration=log4j-spark.properties" \ >--files "${JAAS_CONF},${KEYTAB}" \ >--class "${MAIN_CLASS}" \ >"${ARTIFACT_FILE}" > > > The first batch is huge, even if it worked for the first batch I would've > tried researching more. The problem is that the first batch is more than > 500k records. > > Thanks & Regards > Biplob Biswas > > > On Thu, Jul 26, 2018 at 2:33 PM Todd Nist wrote: > >> Hi Biplob, >> >> How many partitions are on the topic you are reading from and have you >> set the maxRatePerPartition? iirc, spark back pressure is calculated as >> follows: >> >> *Spark back pressure:* >> >> Back pressure is calculated off of the following: >> >> >> • maxRatePerPartition=200 >> >> • batchInterval 30s >> >> • 3 partitions on Ingest topic >> >> >> This results in a maximum ingest rate of 18K: >> >> >> • 3 * 30 * 200 = 18 max >> >> The spark.streaming.backpressure.initialRate only applies to the first >> batch, per docs: >> >> >> This is the initial maximum receiving rate at which each receiver will >>> receive data for the *first batch* when the backpressure mechanism is >>> enabled. >> >> >> If you set the maxRatePerPartition and apply the above formula, I >> believe you will be able to achieve the results you are looking for. >> >> HTH. >> >> -Todd >> >> >> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas >> wrote: >> >>> Did anyone face similar issue? and any viable way to solve this? >>> Thanks & Regards >>> Biplob Biswas >>> >>> >>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas >>> wrote: >>> I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure. In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches. What can I be doing wrong? Thanks & Regards Biplob Biswas >>>
Re: Backpressure initial rate not working
Hi Todd, Thanks for the reply. I have the mayxRatePerPartition set as well. Below is the spark submit config we used and still got the issue. Also the *batch interval is set at 10s* and *number of partitions on the topic is set to 4* : spark2-submit --name "${YARN_NAME}" \ --master yarn \ --deploy-mode ${DEPLOY_MODE} \ --num-executors ${NUM_EXECUTORS} \ --driver-cores ${NUM_DRIVER_CORES} \ --executor-cores ${NUM_EXECUTOR_CORES} \ --driver-memory ${DRIVER_MEMORY} \ --executor-memory ${EXECUTOR_MEMORY} \ --queue ${YARN_QUEUE} \ --keytab ${KEYTAB}-yarn \ --principal ${PRINCIPAL} \ --conf "spark.yarn.preserve.staging.files=true" \ --conf "spark.yarn.submit.waitAppCompletion=false" \ --conf "spark.shuffle.service.enabled=true" \ --conf "spark.dynamicAllocation.enabled=true" \ --conf "spark.dynamicAllocation.minExecutors=1" \ --conf "spark.streaming.backpressure.enabled=true" \ --conf "spark.streaming.receiver.maxRate=15000" \ --conf "spark.streaming.kafka.maxRatePerPartition=15000" \ --conf "spark.streaming.backpressure.initialRate=2000" \ --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \ --driver-class-path "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \ --driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \ --files "${JAAS_CONF},${KEYTAB}" \ --class "${MAIN_CLASS}" \ "${ARTIFACT_FILE}" The first batch is huge, even if it worked for the first batch I would've tried researching more. The problem is that the first batch is more than 500k records. Thanks & Regards Biplob Biswas On Thu, Jul 26, 2018 at 2:33 PM Todd Nist wrote: > Hi Biplob, > > How many partitions are on the topic you are reading from and have you set > the maxRatePerPartition? iirc, spark back pressure is calculated as > follows: > > *Spark back pressure:* > > Back pressure is calculated off of the following: > > > • maxRatePerPartition=200 > > • batchInterval 30s > > • 3 partitions on Ingest topic > > > This results in a maximum ingest rate of 18K: > > > • 3 * 30 * 200 = 18 max > > The spark.streaming.backpressure.initialRate only applies to the first > batch, per docs: > > > This is the initial maximum receiving rate at which each receiver will >> receive data for the *first batch* when the backpressure mechanism is >> enabled. > > > If you set the maxRatePerPartition and apply the above formula, I believe > you will be able to achieve the results you are looking for. > > HTH. > > -Todd > > > On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas > wrote: > >> Did anyone face similar issue? and any viable way to solve this? >> Thanks & Regards >> Biplob Biswas >> >> >> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas >> wrote: >> >>> I have enabled the spark.streaming.backpressure.enabled setting and also >>> set spark.streaming.backpressure.initialRate to 15000, but my spark >>> job is not respecting these settings when reading from Kafka after a >>> failure. >>> >>> In my kafka topic around 500k records are waiting for being processed >>> and they are all taken in 1 huge batch which ultimately takes a long time >>> and fails with executor failure exception. We don't have more resources to >>> give in our test cluster and we expect the backpressure to kick in and take >>> smaller batches. >>> >>> What can I be doing wrong? >>> >>> >>> Thanks & Regards >>> Biplob Biswas >>> >>
Re: Backpressure initial rate not working
Hi Biplob, How many partitions are on the topic you are reading from and have you set the maxRatePerPartition? iirc, spark back pressure is calculated as follows: *Spark back pressure:* Back pressure is calculated off of the following: • maxRatePerPartition=200 • batchInterval 30s • 3 partitions on Ingest topic This results in a maximum ingest rate of 18K: • 3 * 30 * 200 = 18 max The spark.streaming.backpressure.initialRate only applies to the first batch, per docs: This is the initial maximum receiving rate at which each receiver will > receive data for the *first batch* when the backpressure mechanism is > enabled. If you set the maxRatePerPartition and apply the above formula, I believe you will be able to achieve the results you are looking for. HTH. -Todd On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas wrote: > Did anyone face similar issue? and any viable way to solve this? > Thanks & Regards > Biplob Biswas > > > On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas > wrote: > >> I have enabled the spark.streaming.backpressure.enabled setting and also >> set spark.streaming.backpressure.initialRate to 15000, but my spark >> job is not respecting these settings when reading from Kafka after a >> failure. >> >> In my kafka topic around 500k records are waiting for being processed and >> they are all taken in 1 huge batch which ultimately takes a long time and >> fails with executor failure exception. We don't have more resources to give >> in our test cluster and we expect the backpressure to kick in and take >> smaller batches. >> >> What can I be doing wrong? >> >> >> Thanks & Regards >> Biplob Biswas >> >
Re: Backpressure initial rate not working
Did anyone face similar issue? and any viable way to solve this? Thanks & Regards Biplob Biswas On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas wrote: > I have enabled the spark.streaming.backpressure.enabled setting and also > set spark.streaming.backpressure.initialRate to 15000, but my spark job > is not respecting these settings when reading from Kafka after a failure. > > In my kafka topic around 500k records are waiting for being processed and > they are all taken in 1 huge batch which ultimately takes a long time and > fails with executor failure exception. We don't have more resources to give > in our test cluster and we expect the backpressure to kick in and take > smaller batches. > > What can I be doing wrong? > > > Thanks & Regards > Biplob Biswas >
Backpressure initial rate not working
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure. In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches. What can I be doing wrong? Thanks & Regards Biplob Biswas