Thanks Michael! that works! On Tue, Mar 20, 2018 at 5:00 PM, Michael Armbrust <mich...@databricks.com> wrote:
> Those options will not affect structured streaming. You are looking for > > .option("maxOffsetsPerTrigger", "1000") > > We are working on improving this by building a generic mechanism into the > Streaming DataSource V2 so that the engine can do admission control on the > amount of data returned in a source independent way. > > On Tue, Mar 20, 2018 at 2:58 PM, kant kodali <kanth...@gmail.com> wrote: > >> I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured >> streaming using Direct API's although I am not sure? If it is direct API's >> the only parameters that are relevant are below according to this >> <https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang> >> article >> >> - spark.conf("spark.streaming.backpressure.enabled", "true") >> - spark.conf("spark.streaming.kafka.maxRatePerPartition", "10000") >> >> I set both of these and I run select count * on my 10M records I still >> don't see any output until it finishes the initial batch of 10M and this >> takes a while. so I am wondering if I miss something here? >> >> On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen <ge...@ibleducation.com >> > wrote: >> >>> The following >>> <http://spark.apache.org/docs/latest/configuration.html#spark-streaming> >>> settings >>> may be what you’re looking for: >>> >>> - spark.streaming.backpressure.enabled >>> - spark.streaming.backpressure.initialRate >>> - spark.streaming.receiver.maxRate >>> - spark.streaming.kafka.maxRatePerPartition >>> >>> >>> >>> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Yes it indeed makes sense! Is there a way to get incremental counts >>>> when I start from 0 and go through 10M records? perhaps count for every >>>> micro batch or something? >>>> >>>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen < >>>> ge...@ibleducation.com> wrote: >>>> >>>>> Trigger does not mean report the current solution every 'trigger >>>>> seconds'. It means it will attempt to fetch new data and process it no >>>>> faster than trigger seconds intervals. >>>>> >>>>> If you're reading from the beginning and you've got 10M entries in >>>>> kafka, it's likely pulling everything down then processing it completely >>>>> and giving you an initial output. From here on out, it will check kafka >>>>> every 1 second for new data and process it, showing you only the updated >>>>> rows. So the initial read will give you the entire output since there is >>>>> nothing to be 'updating' from. If you add data to kafka now that the >>>>> streaming job has completed it's first batch (and leave it running), it >>>>> will then show you the new/updated rows since the last batch every 1 >>>>> second >>>>> (assuming it can fetch + process in that time span). >>>>> >>>>> If the combined fetch + processing time is > the trigger time, you >>>>> will notice warnings that it is 'falling behind' (I forget the exact >>>>> verbiage, but something to the effect of the calculation took XX time and >>>>> is falling behind). In that case, it will immediately check kafka for new >>>>> messages and begin processing the next batch (if new messages exist). >>>>> >>>>> Hope that makes sense - >>>>> >>>>> >>>>> On Mon, Mar 19, 2018 at 13:36 kant kodali <kanth...@gmail.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I have 10 million records in my Kafka and I am just trying to >>>>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and >>>>>> writing to kafka. >>>>>> >>>>>> My writeStream is set to "update" mode and trigger interval of one >>>>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be >>>>>> printed every second but looks like it would print after going through >>>>>> all >>>>>> 10M. why? >>>>>> >>>>>> Also, it seems to take forever whereas Linux wc of 10M rows would >>>>>> take 30 seconds. >>>>>> >>>>>> Thanks! >>>>>> >>>>> >>>> >>> >> >