Hey there,

I`m quite new to Kafka itself and the Streams API, so pardon any shortcomings.

What I need is a Sliding Window giving me access to the data of e. g. the last 
half hour in relation to the current data element. As far as I understand, the 
Streams API does not  provide such a window.

Therefore I tried to implement my own transformer. It is backed by a custom 
Window Store which knows about the window size and therefore just needs one 
timestamp for fetching data. This custom Window Store is backed by a Persistent 
Window Store:


super(Stores.persistentWindowStore(name, windowSizeMillis, 2, windowSizeMillis, 
false), Serdes.String(), new ImportDataSerde(), Time.SYSTEM);

The implementation of the transformer looks something like this:

@Override
public KeyValue<String, ImportData> transform(final String key, final 
ImportData data) {
    final long windowEndTime = 
data.getMetaData().getTimestamp().toInstant().toEpochMilli();

    // add current value to the queue
    getStateStore().put(key, data, windowEndTime);

    // fetch data from state for given key
    final Collection<ImportData> importDataList = getStateStore().fetch(key, 
windowEndTime);

    // forward new Import data with all fetched entries
    return KeyValue.pair(key, new ImportData(importDataList));
}

On an incoming data element, I first extract its timestamp. Then I add the data 
to the store. After that I fetch the data of the window.

This works as expected in tests. Now I installed the service on my server and 
tried to process some real data. Problem now is, that sometimes the fetch does 
not return any data.

So here the questions:
1.) How is it even possible for the fetch to not return any elements? I just 
put in the current value. The least I would expect is that it would return just 
this value.

2.) Currently, I’m guessing that the window already deleted all the data 
because I’m using the wrong time semantics. How does the Time Stamp Extractor 
influence windows?

3.) To build the backing Window Store I call Stores.persistenWindowStore(). The 
parameters of this method are quite unclear to me:
a) What are the segments of the store? How do I choose a good number of 
segments?
b) Does the retention time start at the beginning of the window or at the end 
of it?
c) Why does the store even need to know about the window size?
d) How does the store identify duplicates?
(sorry for those probably quite stupid questions, I guess I didn’t get all the 
logic of windows yet.)

4.) Is there an easier way to implement this logic? Did I miss any predefined 
windows giving me, what I need?

Thanks for reading this. I’m glad about any answers or pointer to docs 
explaining this.

By the way: I quite like Kafka and Kafka Streams so far 😊

Best,
Claudia

Reply via email to