I am not sure if I understand the complete scenario yet.

> I need to delete all items from that source that
> doesn't exist in the latest CSV file.

Cannot follow here. I thought your CSV files provide the data you want
to process. But it seems you also have a second source?

How does your Streams app compute the items you want to delete? If you
have this items in a KTable, you can access them from outside your
application using Interactive Queries.

Thus, you can monitor the app progress by observing committed offsets,
and if finished, you query your KTable to extract the items you want to
delete and do the cleanup.

Does this make sense?

For Interactive Queries see the docs and blog post:

http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/



-Matthias


On 1/30/17 9:10 PM, Eric Dain wrote:
> Thanks Matthias for your reply.
> 
> I'm not trying to stop the application. I'm importing inventory from CSV
> files coming from 3rd party sources. The CSVs are snapshots for each
> source's inventory. I need to delete all items from that source that
> doesn't exist in the latest CSV file.
> 
> I was thinking of using "End of Batch" message to initiate that process. I
> might need to do the clean-up as part of the Connect code instead, or there
> is a better way of doing that?
> 
> Thanks,
> Eric
> 
> 
> 
> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Hi,
>>
>> currently, a Kafka Streams application is designed to "run forever" and
>> there is no notion of "End of Batch" -- we have plans to add this
>> though... (cf.
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>
>> Thus, right now you need to stop your application manually. You would
>> need to observe the application's committed offsets (and lag) using
>> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
>> monitor the app's progress to see when it is done.
>>
>> Cf.
>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>
>>
>> -Matthias
>>
>>
>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>> Hi,
>>>
>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest large
>>> csv file. I need to run some clean-up code after all records in the file
>>> are processed. Is there a way to send "End of Batch" event that is
>>> guaranteed to be processed after all records? If not is there alternative
>>> solution?
>>>
>>> Thanks,
>>> Eric
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to