Hi! I have an application which uses an input and output topic, and every message from the input topic should have a corresponding message (with the same key) in the output topic.
To detect lost messages (=no output after a certain amount of time, ~10days) I tried to use a KTable - KTable left join and check where the output values are null in the result KTable's state store. Sample code: // Stream setup StreamsBuilder builder = new StreamsBuilder(); KTable<String, InboundMsg> inputTable = builder.table("inputTopic", Consumed.with(...).filter(...)); KTable<String, OutboundMsg> outputTable = builder.table("outputTopic", Consumed.with(...)); Materialized<String, InboundMsg, KeyValueStore<Bytes, byte[]>> store = Materialized.<String, InboundMsg, KeyValueStore<Bytes, byte[]>>as("Store")..; KTable<String, InboundMsg> joinedTable = inputTable.leftJoin(outputTable, ValueMapper, store); // Read from store ReadOnlyKeyValueStore<String, InboundMsg> keyValueStore = streams.store("Store", QueryableStoreTypes.keyValueStore()); KeyValueIterator<String, InboundMsg> allMsg = keyValueStore.all(); Is there any other way to read from the state store and possibly stream it to a topic? As there can be a couple of million messages int he topics, reading all of them with an iterator will be not performant enough. Thanks, Jozsef