Sorry for not being clear. Let me explain by example. Let's say I have two
sources S1 and S2. The application that I need to write will load the files
from these sources every 24 hours. The results will be KTable K.

For day 1:
S1=[A, B, C]   =>  the result K = [A,B,C]

S2=[D,E,F] =>   K will be [A,B,C,D,E,F]

For day 2:

S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
On the other hand, I will process A and B again in case of updates.

In other words, I know how to process existent and new items, I'm not sure
how to remove items missing from the latest CSV file.

If I can use Interactive Queries from inside the SourceTask to get a
snapshot of what currently in K for a specific source S, then I can send
delete message for the missing items by subtracting latest items in the CSV
from the items of that source in K.

Thanks,

On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> I am not sure if I understand the complete scenario yet.
>
> > I need to delete all items from that source that
> > doesn't exist in the latest CSV file.
>
> Cannot follow here. I thought your CSV files provide the data you want
> to process. But it seems you also have a second source?
>
> How does your Streams app compute the items you want to delete? If you
> have this items in a KTable, you can access them from outside your
> application using Interactive Queries.
>
> Thus, you can monitor the app progress by observing committed offsets,
> and if finished, you query your KTable to extract the items you want to
> delete and do the cleanup.
>
> Does this make sense?
>
> For Interactive Queries see the docs and blog post:
>
> http://docs.confluent.io/current/streams/developer-
> guide.html#interactive-queries
>
> https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
>
>
>
> -Matthias
>
>
> On 1/30/17 9:10 PM, Eric Dain wrote:
> > Thanks Matthias for your reply.
> >
> > I'm not trying to stop the application. I'm importing inventory from CSV
> > files coming from 3rd party sources. The CSVs are snapshots for each
> > source's inventory. I need to delete all items from that source that
> > doesn't exist in the latest CSV file.
> >
> > I was thinking of using "End of Batch" message to initiate that process.
> I
> > might need to do the clean-up as part of the Connect code instead, or
> there
> > is a better way of doing that?
> >
> > Thanks,
> > Eric
> >
> >
> >
> > On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Hi,
> >>
> >> currently, a Kafka Streams application is designed to "run forever" and
> >> there is no notion of "End of Batch" -- we have plans to add this
> >> though... (cf.
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
> >>
> >> Thus, right now you need to stop your application manually. You would
> >> need to observe the application's committed offsets (and lag) using
> >> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
> >> monitor the app's progress to see when it is done.
> >>
> >> Cf.
> >> https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams+Data+%28Re%29Processing+Scenarios
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 1/28/17 1:07 PM, Eric Dain wrote:
> >>> Hi,
> >>>
> >>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
> large
> >>> csv file. I need to run some clean-up code after all records in the
> file
> >>> are processed. Is there a way to send "End of Batch" event that is
> >>> guaranteed to be processed after all records? If not is there
> alternative
> >>> solution?
> >>>
> >>> Thanks,
> >>> Eric
> >>>
> >>
> >>
> >
>
>

Reply via email to