Sorry for the confusion, I stopped the example before processing the file from S2.
So in day 2, if we get S2=[D,E, Z], we will have to remove F and add Z; K = [A,B,D,E,Z] To elaborate more, A, B and C belong to S1 ( items have field to state their source). Processing files from S1 should never delete or modify items belong to S2. Thanks for the feedback that I should not use Interactive Queries in SourceTask. Currently, I'm representing all CSVs records in one KStream (adding source to each record). But I can represent them as separate KStreams if needed. Are you suggesting windowing these KStreams with 24 hours window and then merging them? On Tue, Jan 31, 2017 at 4:31 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for the update. > > What is not clear to me: why do you only need to remove C, but not > D,E,F, too, as source2 does not deliver any data on day 2? > > Furhtermore, IQ is designed to be use outside of you Streams code, and > thus, you should no use it in SourceTask (not sure if this would even be > possible in the first place). > > However, you might be able to exploit joins... Your CSV input is > KStream, right? > > > -Matthias > > On 1/31/17 3:10 PM, Eric Dain wrote: > > Sorry for not being clear. Let me explain by example. Let's say I have > two > > sources S1 and S2. The application that I need to write will load the > files > > from these sources every 24 hours. The results will be KTable K. > > > > For day 1: > > S1=[A, B, C] => the result K = [A,B,C] > > > > S2=[D,E,F] => K will be [A,B,C,D,E,F] > > > > For day 2: > > > > S1=[A,B] because C is missing I have to remove it from K; K= [A,B,D,E,F] > > On the other hand, I will process A and B again in case of updates. > > > > In other words, I know how to process existent and new items, I'm not > sure > > how to remove items missing from the latest CSV file. > > > > If I can use Interactive Queries from inside the SourceTask to get a > > snapshot of what currently in K for a specific source S, then I can send > > delete message for the missing items by subtracting latest items in the > CSV > > from the items of that source in K. > > > > Thanks, > > > > On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> I am not sure if I understand the complete scenario yet. > >> > >>> I need to delete all items from that source that > >>> doesn't exist in the latest CSV file. > >> > >> Cannot follow here. I thought your CSV files provide the data you want > >> to process. But it seems you also have a second source? > >> > >> How does your Streams app compute the items you want to delete? If you > >> have this items in a KTable, you can access them from outside your > >> application using Interactive Queries. > >> > >> Thus, you can monitor the app progress by observing committed offsets, > >> and if finished, you query your KTable to extract the items you want to > >> delete and do the cleanup. > >> > >> Does this make sense? > >> > >> For Interactive Queries see the docs and blog post: > >> > >> http://docs.confluent.io/current/streams/developer- > >> guide.html#interactive-queries > >> > >> https://www.confluent.io/blog/unifying-stream-processing- > >> and-interactive-queries-in-apache-kafka/ > >> > >> > >> > >> -Matthias > >> > >> > >> On 1/30/17 9:10 PM, Eric Dain wrote: > >>> Thanks Matthias for your reply. > >>> > >>> I'm not trying to stop the application. I'm importing inventory from > CSV > >>> files coming from 3rd party sources. The CSVs are snapshots for each > >>> source's inventory. I need to delete all items from that source that > >>> doesn't exist in the latest CSV file. > >>> > >>> I was thinking of using "End of Batch" message to initiate that > process. > >> I > >>> might need to do the clean-up as part of the Connect code instead, or > >> there > >>> is a better way of doing that? > >>> > >>> Thanks, > >>> Eric > >>> > >>> > >>> > >>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax < > matth...@confluent.io> > >>> wrote: > >>> > >>>> Hi, > >>>> > >>>> currently, a Kafka Streams application is designed to "run forever" > and > >>>> there is no notion of "End of Batch" -- we have plans to add this > >>>> though... (cf. > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams) > >>>> > >>>> Thus, right now you need to stop your application manually. You would > >>>> need to observe the application's committed offsets (and lag) using > >>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID) > to > >>>> monitor the app's progress to see when it is done. > >>>> > >>>> Cf. > >>>> https://cwiki.apache.org/confluence/display/KAFKA/ > >>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 1/28/17 1:07 PM, Eric Dain wrote: > >>>>> Hi, > >>>>> > >>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest > >> large > >>>>> csv file. I need to run some clean-up code after all records in the > >> file > >>>>> are processed. Is there a way to send "End of Batch" event that is > >>>>> guaranteed to be processed after all records? If not is there > >> alternative > >>>>> solution? > >>>>> > >>>>> Thanks, > >>>>> Eric > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >