Hi!

I have an application which uses an input and output topic, and every
message from the input topic should have a corresponding message (with the
same key) in the output topic.

To detect lost messages (=no output after a certain amount of time,
~10days) I tried to use a KTable - KTable left join and check where the
output values are null in the result KTable's state store.

Sample code:
// Stream setup
StreamsBuilder builder = new StreamsBuilder();
KTable<String, InboundMsg> inputTable = builder.table("inputTopic",
Consumed.with(...).filter(...));
KTable<String, OutboundMsg> outputTable = builder.table("outputTopic",
Consumed.with(...));

Materialized<String, InboundMsg, KeyValueStore<Bytes, byte[]>> store =
        Materialized.<String, InboundMsg, KeyValueStore<Bytes,
byte[]>>as("Store")..;
KTable<String, InboundMsg> joinedTable = inputTable.leftJoin(outputTable,
ValueMapper, store);

// Read from store
ReadOnlyKeyValueStore<String, InboundMsg> keyValueStore =
streams.store("Store", QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, InboundMsg> allMsg = keyValueStore.all();

Is there any other way to read from the state store and possibly stream it
to a topic? As there can be a couple of million messages int he topics,
reading all of them with an iterator will be not performant enough.

Thanks,
Jozsef

Reply via email to