Sorry for the confusion, I stopped the example before processing the file
from S2.

So in day 2, if we get
S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z]

To elaborate more, A, B and C belong to S1 ( items have field to state
their source). Processing files from S1 should never delete or modify items
belong to S2.

Thanks for the feedback that I should not use Interactive Queries in
SourceTask.

Currently, I'm representing all CSVs records in one KStream (adding source
to each record). But I can represent them as separate KStreams if needed.
Are you suggesting windowing these KStreams with 24 hours window and then
merging them?




On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for the update.
>
> What is not clear to me: why do you only need to remove C, but not
> D,E,F, too, as source2 does not deliver any data on day 2?
>
> Furhtermore, IQ is designed to be use outside of you Streams code, and
> thus, you should no use it in SourceTask (not sure if this would even be
> possible in the first place).
>
> However, you might be able to exploit joins... Your CSV input is
> KStream, right?
>
>
> -Matthias
>
> On 1/31/17 3:10 PM, Eric Dain wrote:
> > Sorry for not being clear. Let me explain by example. Let's say I have
> two
> > sources S1 and S2. The application that I need to write will load the
> files
> > from these sources every 24 hours. The results will be KTable K.
> >
> > For day 1:
> > S1=[A, B, C]   =>  the result K = [A,B,C]
> >
> > S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
> >
> > For day 2:
> >
> > S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
> > On the other hand, I will process A and B again in case of updates.
> >
> > In other words, I know how to process existent and new items, I'm not
> sure
> > how to remove items missing from the latest CSV file.
> >
> > If I can use Interactive Queries from inside the SourceTask to get a
> > snapshot of what currently in K for a specific source S, then I can send
> > delete message for the missing items by subtracting latest items in the
> CSV
> > from the items of that source in K.
> >
> > Thanks,
> >
> > On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> I am not sure if I understand the complete scenario yet.
> >>
> >>> I need to delete all items from that source that
> >>> doesn't exist in the latest CSV file.
> >>
> >> Cannot follow here. I thought your CSV files provide the data you want
> >> to process. But it seems you also have a second source?
> >>
> >> How does your Streams app compute the items you want to delete? If you
> >> have this items in a KTable, you can access them from outside your
> >> application using Interactive Queries.
> >>
> >> Thus, you can monitor the app progress by observing committed offsets,
> >> and if finished, you query your KTable to extract the items you want to
> >> delete and do the cleanup.
> >>
> >> Does this make sense?
> >>
> >> For Interactive Queries see the docs and blog post:
> >>
> >> http://docs.confluent.io/current/streams/developer-
> >> guide.html#interactive-queries
> >>
> >> https://www.confluent.io/blog/unifying-stream-processing-
> >> and-interactive-queries-in-apache-kafka/
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 1/30/17 9:10 PM, Eric Dain wrote:
> >>> Thanks Matthias for your reply.
> >>>
> >>> I'm not trying to stop the application. I'm importing inventory from
> CSV
> >>> files coming from 3rd party sources. The CSVs are snapshots for each
> >>> source's inventory. I need to delete all items from that source that
> >>> doesn't exist in the latest CSV file.
> >>>
> >>> I was thinking of using "End of Batch" message to initiate that
> process.
> >> I
> >>> might need to do the clean-up as part of the Connect code instead, or
> >> there
> >>> is a better way of doing that?
> >>>
> >>> Thanks,
> >>> Eric
> >>>
> >>>
> >>>
> >>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> currently, a Kafka Streams application is designed to "run forever"
> and
> >>>> there is no notion of "End of Batch" -- we have plans to add this
> >>>> though... (cf.
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
> >>>>
> >>>> Thus, right now you need to stop your application manually. You would
> >>>> need to observe the application's committed offsets (and lag) using
> >>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID)
> to
> >>>> monitor the app's progress to see when it is done.
> >>>>
> >>>> Cf.
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/
> >>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 1/28/17 1:07 PM, Eric Dain wrote:
> >>>>> Hi,
> >>>>>
> >>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
> >> large
> >>>>> csv file. I need to run some clean-up code after all records in the
> >> file
> >>>>> are processed. Is there a way to send "End of Batch" event that is
> >>>>> guaranteed to be processed after all records? If not is there
> >> alternative
> >>>>> solution?
> >>>>>
> >>>>> Thanks,
> >>>>> Eric
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to