I believe that's a question to the NiFi list, as you can see the the code base is quite old https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark and it doesn't make use of the https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
On Sat, Sep 16, 2017 at 1:59 AM, Margus Roo <mar...@roo.ee> wrote: > Some more info > > val lines = ssc.socketStream() // worksval lines = ssc.receiverStream(new > NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK_SER_2)) // does not work > > Margus (margusja) Roohttp://margus.roo.ee > skype: margusjahttps://www.facebook.com/allan.tuuring > +372 51 48 780 > > On 15/09/2017 21:50, Margus Roo wrote: > > Hi > > I tested spark.streaming.receiver.maxRate and > spark.streaming.backpressure.enabled settings using socketStream and it > works. > > But if I am using nifi-spark-receiver (https://mvnrepository.com/ > artifact/org.apache.nifi/nifi-spark-receiver) then it does not using > spark.streaming.receiver.maxRate > > any workaround? > > Margus (margusja) Roohttp://margus.roo.ee > skype: margusjahttps://www.facebook.com/allan.tuuring > +372 51 48 780 > > On 14/09/2017 09:57, Margus Roo wrote: > > Hi > > Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 > and Java 1.8.0_60 > > I have Nifi flow produces more records than Spark stream can work in batch > time. To avoid spark queue overflow I wanted to try spark streaming > backpressure (did not work for my) so back to the more simple but static > solution I tried spark.streaming.receiver.maxRate. > > I set it spark.streaming.receiver.maxRate=1. As I understand it from > Spark manual: "If the batch processing time is more than batchinterval > then obviously the receiver’s memory will start filling up and will end up > in throwing exceptions (most probably BlockNotFoundException). Currently > there is no way to pause the receiver. Using SparkConf configuration > spark.streaming.receiver.maxRate, rate of receiver can be limited." - it > means 1 record per second? > > I have very simple code: > > val conf = new > SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi" > <http://192.168.80.120:9090/nifi>).portName("testing").buildConfig()val ssc = > new StreamingContext(sc, Seconds(1)) > val lines = ssc.receiverStream(new NiFiReceiver(conf, > StorageLevel.MEMORY_AND_DISK)) > lines.print() > > ssc.start() > > > I have loads of records waiting in Nifi testing port. After I start > ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I > understand spark.streaming.receiver.maxRate wrong? > > -- > Margus (margusja) Roohttp://margus.roo.ee > skype: margusjahttps://www.facebook.com/allan.tuuring > +372 51 48 780 > > > > -- Cheers!