Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe
writer functions we would like to use, unfortunately they were written for
static dataframes in mind. I do see there is a ForEachBatch write mode but
my thinking was at that point it was easier to read from kafka through
batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <brk...@gmail.com> wrote:

> Hi Ruijing,
>
> Why do you not want to use structured streaming here? This is exactly why
> structured streaming + Trigger.Once was built, just so that you don't build
> that solution yourself.
> You also get exactly once semantics if you use the built in sinks.
>
> Best,
> Burak
>
> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <anil...@gmail.com> wrote:
>
>> Hi Ruijing,
>>
>> We did the below things to read Kafka in batch from spark:
>>
>> 1) Maintain the start offset (could be db, file etc)
>> 2) Get the end offset dynamically when the job executes.
>> 3) Pass the start and end offsets
>> 4) Overwrite the start offset with the end offset. (Should be done post
>> processing the data)
>>
>> Currently to make it work in batch mode, you need to maintain the state
>> information of the offsets externally.
>>
>>
>> Thanks
>> Anil
>>
>> -Sent from my mobile
>> http://anilkulkarni.com/
>>
>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <liruijin...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> My use case is to read from single kafka topic using a batch spark sql
>>> job (not structured streaming ideally). I want this batch job every time it
>>> starts to get the last offset it stopped at, and start reading from there
>>> until it caught up to the latest offset, store the result and stop the job.
>>> Given the dataframe has a partition and offset column, my first thought for
>>> offset management is to groupBy partition and agg the max offset, then
>>> store it in HDFS. Next time the job runs, it will read and start from this
>>> max offset using startingOffsets
>>>
>>> However, I was wondering if this will work. If the kafka producer failed
>>> an offset and later decides to resend it, I will have skipped it since I’m
>>> starting from the max offset sent. How does spark structured streaming know
>>> to continue onwards - does it keep a state of all offsets seen? If so, how
>>> can I replicate this for batch without missing data? Any help would be
>>> appreciated.
>>>
>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
Cheers,
Ruijing Li

Reply via email to