I believe that's a question to the NiFi list, as you can see the the code
base is quite old
https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark
and it doesn't make use of the
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala


On Sat, Sep 16, 2017 at 1:59 AM, Margus Roo <mar...@roo.ee> wrote:

> Some more info
>
> val lines = ssc.socketStream() // worksval lines = ssc.receiverStream(new 
> NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK_SER_2)) // does not work
>
> Margus (margusja) Roohttp://margus.roo.ee
> skype: margusjahttps://www.facebook.com/allan.tuuring
> +372 51 48 780
>
> On 15/09/2017 21:50, Margus Roo wrote:
>
> Hi
>
> I tested spark.streaming.receiver.maxRate and
> spark.streaming.backpressure.enabled settings using socketStream and it
> works.
>
> But if I am using nifi-spark-receiver (https://mvnrepository.com/
> artifact/org.apache.nifi/nifi-spark-receiver) then it does not using
> spark.streaming.receiver.maxRate
>
> any workaround?
>
> Margus (margusja) Roohttp://margus.roo.ee
> skype: margusjahttps://www.facebook.com/allan.tuuring
> +372 51 48 780
>
> On 14/09/2017 09:57, Margus Roo wrote:
>
> Hi
>
> Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8
> and Java 1.8.0_60
>
> I have Nifi flow produces more records than Spark stream can work in batch
> time. To avoid spark queue overflow I wanted to try spark streaming
> backpressure (did not work for my) so back to the more simple but static
> solution I tried spark.streaming.receiver.maxRate.
>
> I set it spark.streaming.receiver.maxRate=1. As I understand it from
> Spark manual: "If the batch processing time is more than batchinterval
> then obviously the receiver’s memory will start filling up and will end up
> in throwing exceptions (most probably BlockNotFoundException). Currently
> there is no way to pause the receiver. Using SparkConf configuration
> spark.streaming.receiver.maxRate, rate of receiver can be limited." - it
> means 1 record per second?
>
> I have very simple code:
>
> val conf = new 
> SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi"; 
> <http://192.168.80.120:9090/nifi>).portName("testing").buildConfig()val ssc = 
> new StreamingContext(sc, Seconds(1))
> val lines = ssc.receiverStream(new NiFiReceiver(conf, 
> StorageLevel.MEMORY_AND_DISK))
> lines.print()
>
> ssc.start()
>
>
> I have loads of records waiting in Nifi testing port. After I start
> ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I
> understand spark.streaming.receiver.maxRate wrong?
>
> --
> Margus (margusja) Roohttp://margus.roo.ee
> skype: margusjahttps://www.facebook.com/allan.tuuring
> +372 51 48 780
>
>
>
>


-- 
Cheers!

Reply via email to