Sorry for not being clear. Let me explain by example. Let's say I have two sources S1 and S2. The application that I need to write will load the files from these sources every 24 hours. The results will be KTable K.
For day 1: S1=[A, B, C] => the result K = [A,B,C] S2=[D,E,F] => K will be [A,B,C,D,E,F] For day 2: S1=[A,B] because C is missing I have to remove it from K; K= [A,B,D,E,F] On the other hand, I will process A and B again in case of updates. In other words, I know how to process existent and new items, I'm not sure how to remove items missing from the latest CSV file. If I can use Interactive Queries from inside the SourceTask to get a snapshot of what currently in K for a specific source S, then I can send delete message for the missing items by subtracting latest items in the CSV from the items of that source in K. Thanks, On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <matth...@confluent.io> wrote: > I am not sure if I understand the complete scenario yet. > > > I need to delete all items from that source that > > doesn't exist in the latest CSV file. > > Cannot follow here. I thought your CSV files provide the data you want > to process. But it seems you also have a second source? > > How does your Streams app compute the items you want to delete? If you > have this items in a KTable, you can access them from outside your > application using Interactive Queries. > > Thus, you can monitor the app progress by observing committed offsets, > and if finished, you query your KTable to extract the items you want to > delete and do the cleanup. > > Does this make sense? > > For Interactive Queries see the docs and blog post: > > http://docs.confluent.io/current/streams/developer- > guide.html#interactive-queries > > https://www.confluent.io/blog/unifying-stream-processing- > and-interactive-queries-in-apache-kafka/ > > > > -Matthias > > > On 1/30/17 9:10 PM, Eric Dain wrote: > > Thanks Matthias for your reply. > > > > I'm not trying to stop the application. I'm importing inventory from CSV > > files coming from 3rd party sources. The CSVs are snapshots for each > > source's inventory. I need to delete all items from that source that > > doesn't exist in the latest CSV file. > > > > I was thinking of using "End of Batch" message to initiate that process. > I > > might need to do the clean-up as part of the Connect code instead, or > there > > is a better way of doing that? > > > > Thanks, > > Eric > > > > > > > > On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Hi, > >> > >> currently, a Kafka Streams application is designed to "run forever" and > >> there is no notion of "End of Batch" -- we have plans to add this > >> though... (cf. > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams) > >> > >> Thus, right now you need to stop your application manually. You would > >> need to observe the application's committed offsets (and lag) using > >> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to > >> monitor the app's progress to see when it is done. > >> > >> Cf. > >> https://cwiki.apache.org/confluence/display/KAFKA/ > >> Kafka+Streams+Data+%28Re%29Processing+Scenarios > >> > >> > >> -Matthias > >> > >> > >> On 1/28/17 1:07 PM, Eric Dain wrote: > >>> Hi, > >>> > >>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest > large > >>> csv file. I need to run some clean-up code after all records in the > file > >>> are processed. Is there a way to send "End of Batch" event that is > >>> guaranteed to be processed after all records? If not is there > alternative > >>> solution? > >>> > >>> Thanks, > >>> Eric > >>> > >> > >> > > > >