Thanks Michael! that works!

On Tue, Mar 20, 2018 at 5:00 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Those options will not affect structured streaming.  You are looking for
>
> .option("maxOffsetsPerTrigger", "1000")
>
> We are working on improving this by building a generic mechanism into the
> Streaming DataSource V2 so that the engine can do admission control on the
> amount of data returned in a source independent way.
>
> On Tue, Mar 20, 2018 at 2:58 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured
>> streaming using Direct API's although I am not sure? If it is direct API's
>> the only parameters that are relevant are below according to this
>> <https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang>
>> article
>>
>>    - spark.conf("spark.streaming.backpressure.enabled", "true")
>>    - spark.conf("spark.streaming.kafka.maxRatePerPartition", "10000")
>>
>> I set both of these and I run select count * on my 10M records I still
>> don't see any output until it finishes the initial batch of 10M and this
>> takes a while. so I am wondering if I miss something here?
>>
>> On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen <ge...@ibleducation.com
>> > wrote:
>>
>>> The following
>>> <http://spark.apache.org/docs/latest/configuration.html#spark-streaming> 
>>> settings
>>> may be what you’re looking for:
>>>
>>>    - spark.streaming.backpressure.enabled
>>>    - spark.streaming.backpressure.initialRate
>>>    - spark.streaming.receiver.maxRate
>>>    - spark.streaming.kafka.maxRatePerPartition
>>>
>>> ​
>>>
>>> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Yes it indeed makes sense! Is there a way to get incremental counts
>>>> when I start from 0 and go through 10M records? perhaps count for every
>>>> micro batch or something?
>>>>
>>>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <
>>>> ge...@ibleducation.com> wrote:
>>>>
>>>>> Trigger does not mean report the current solution every 'trigger
>>>>> seconds'. It means it will attempt to fetch new data and process it no
>>>>> faster than trigger seconds intervals.
>>>>>
>>>>> If you're reading from the beginning and you've got 10M entries in
>>>>> kafka, it's likely pulling everything down then processing it completely
>>>>> and giving you an initial output. From here on out, it will check kafka
>>>>> every 1 second for new data and process it, showing you only the updated
>>>>> rows. So the initial read will give you the entire output since there is
>>>>> nothing to be 'updating' from. If you add data to kafka now that the
>>>>> streaming job has completed it's first batch (and leave it running), it
>>>>> will then show you the new/updated rows since the last batch every 1 
>>>>> second
>>>>> (assuming it can fetch + process in that time span).
>>>>>
>>>>> If the combined fetch + processing time is > the trigger time, you
>>>>> will notice warnings that it is 'falling behind' (I forget the exact
>>>>> verbiage, but something to the effect of the calculation took XX time and
>>>>> is falling behind). In that case, it will immediately check kafka for new
>>>>> messages and begin processing the next batch (if new messages exist).
>>>>>
>>>>> Hope that makes sense -
>>>>>
>>>>>
>>>>> On Mon, Mar 19, 2018 at 13:36 kant kodali <kanth...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have 10 million records in my Kafka and I am just trying to
>>>>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and
>>>>>> writing to kafka.
>>>>>>
>>>>>> My writeStream is set to "update" mode and trigger interval of one
>>>>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be
>>>>>> printed every second but looks like it would print after going through 
>>>>>> all
>>>>>> 10M. why?
>>>>>>
>>>>>> Also, it seems to take forever whereas Linux wc of 10M rows would
>>>>>> take 30 seconds.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to