Thanks for the update.

What is not clear to me: why do you only need to remove C, but not
D,E,F, too, as source2 does not deliver any data on day 2?

Furhtermore, IQ is designed to be use outside of you Streams code, and
thus, you should no use it in SourceTask (not sure if this would even be
possible in the first place).

However, you might be able to exploit joins... Your CSV input is
KStream, right?


-Matthias

On 1/31/17 3:10 PM, Eric Dain wrote:
> Sorry for not being clear. Let me explain by example. Let's say I have two
> sources S1 and S2. The application that I need to write will load the files
> from these sources every 24 hours. The results will be KTable K.
> 
> For day 1:
> S1=[A, B, C]   =>  the result K = [A,B,C]
> 
> S2=[D,E,F] =>   K will be [A,B,C,D,E,F]
> 
> For day 2:
> 
> S1=[A,B] because C is missing I have to remove it from K;  K= [A,B,D,E,F]
> On the other hand, I will process A and B again in case of updates.
> 
> In other words, I know how to process existent and new items, I'm not sure
> how to remove items missing from the latest CSV file.
> 
> If I can use Interactive Queries from inside the SourceTask to get a
> snapshot of what currently in K for a specific source S, then I can send
> delete message for the missing items by subtracting latest items in the CSV
> from the items of that source in K.
> 
> Thanks,
> 
> On Tue, Jan 31, 2017 at 1:54 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I am not sure if I understand the complete scenario yet.
>>
>>> I need to delete all items from that source that
>>> doesn't exist in the latest CSV file.
>>
>> Cannot follow here. I thought your CSV files provide the data you want
>> to process. But it seems you also have a second source?
>>
>> How does your Streams app compute the items you want to delete? If you
>> have this items in a KTable, you can access them from outside your
>> application using Interactive Queries.
>>
>> Thus, you can monitor the app progress by observing committed offsets,
>> and if finished, you query your KTable to extract the items you want to
>> delete and do the cleanup.
>>
>> Does this make sense?
>>
>> For Interactive Queries see the docs and blog post:
>>
>> http://docs.confluent.io/current/streams/developer-
>> guide.html#interactive-queries
>>
>> https://www.confluent.io/blog/unifying-stream-processing-
>> and-interactive-queries-in-apache-kafka/
>>
>>
>>
>> -Matthias
>>
>>
>> On 1/30/17 9:10 PM, Eric Dain wrote:
>>> Thanks Matthias for your reply.
>>>
>>> I'm not trying to stop the application. I'm importing inventory from CSV
>>> files coming from 3rd party sources. The CSVs are snapshots for each
>>> source's inventory. I need to delete all items from that source that
>>> doesn't exist in the latest CSV file.
>>>
>>> I was thinking of using "End of Batch" message to initiate that process.
>> I
>>> might need to do the clean-up as part of the Connect code instead, or
>> there
>>> is a better way of doing that?
>>>
>>> Thanks,
>>> Eric
>>>
>>>
>>>
>>> On Sun, Jan 29, 2017 at 4:37 PM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> currently, a Kafka Streams application is designed to "run forever" and
>>>> there is no notion of "End of Batch" -- we have plans to add this
>>>> though... (cf.
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams)
>>>>
>>>> Thus, right now you need to stop your application manually. You would
>>>> need to observe the application's committed offsets (and lag) using
>>>> bin/kafka-consumer-groups.sh (the application ID is user as group ID) to
>>>> monitor the app's progress to see when it is done.
>>>>
>>>> Cf.
>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>> Kafka+Streams+Data+%28Re%29Processing+Scenarios
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 1/28/17 1:07 PM, Eric Dain wrote:
>>>>> Hi,
>>>>>
>>>>> I'm pretty new to Kafka Streams. I am using Kafka Streams to ingest
>> large
>>>>> csv file. I need to run some clean-up code after all records in the
>> file
>>>>> are processed. Is there a way to send "End of Batch" event that is
>>>>> guaranteed to be processed after all records? If not is there
>> alternative
>>>>> solution?
>>>>>
>>>>> Thanks,
>>>>> Eric
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to