You can try to check the file which rocksdb creates for the state store to see if value is stored for the key.
What I suspect is that get may be getting called before put in your topology. Try retrieving the value from state store further downstream. Thanks Sachin On Thu, 2 Apr 2020, 02:35 Jan Bols, <janb...@telenet.be> wrote: > Ok, Matthias, > > thanks for the hint: > *Even if any upstream operation was key-changing, no auto-repartition is > triggered. If repartitioning is required, a call to through() > < > https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#through-java.lang.String- > > > should be performed before flatTransformValues(). * > > Of course, I didn't call *through* before calling the transformer. As a > result some calls where being processed by another instance of the > transformer running on a different partition. Calling *store.get(key)* on > an instance would then not return any value even though another instance > did a *store.put(key, value)* before. Is this expected behaviour? Is there > a transformer for each partition and does it get its own state store? > > Best regards > > Jan > > On Fri, Mar 27, 2020 at 12:59 AM Matthias J. Sax <mj...@apache.org> wrote: > > > Your code looks correct to me. If you write into the store, you should > > also be able to read it back from the store. > > > > Can you reproduce the issue using `TopologyTestDriver`? How many > > partitions does your input topic have? Is your stream partitioned by > > key? Note that `transfrom()` does not do auto-repartitioning in contrast > > to `groupByKey()`. > > > > > > -Matthias > > > > On 3/25/20 3:49 AM, Jan Bols wrote: > > > Hi all, > > > I'm trying to aggregate a stream of messages and return a stream of > > > aggregated results using kafka streams. > > > At some point, depending on the incoming message, the old aggregate > needs > > > to be closed and a new aggregate needs to be created, just like a > session > > > that is closed due to some close event and at the same time a new > session > > > is started. > > > > > > For this I'm using transformValues where I store the result of an > > > aggregation similar to how a groupByKey().aggregate() is done. When the > > old > > > session needs to be closed, it's sent first after the new value. > > > > > > The state store returns null for a given key at first retrieval and the > > new > > > aggregation result is stored under the same key. > > > However, at the second pass, the value for the same key is still null > > even > > > though it has just been stored before. > > > > > > How can this be possible? > > > > > > > > > > > > I'm using transformValues in the following way: > > > > > > val storeName = "aggregateOverflow_binReportAgg" > > > val store = Stores.keyValueStoreBuilder<K, > > > V>(Stores.persistentKeyValueStore(storeName), serde.serde(), > > serde.serde()) > > > streamsBuilder.addStateStore(store) > > > > > > ... > > > > > > stream > > > .flatTransformValues(ValueTransformerWithKeySupplier { > > > AggregateOverflow(storeName, transformation) }, storeName) > > > > > > > > > where AggregateOverflow gets the previous value from the state store, > > > transforms the result into a AggregateOverflowResult. > > > AggregateOverflowResult is a data class containing the current value > and > > an > > > optional overflow value like this: > > > > > > data class AggregateOverflowResult<V>(val current: V, val overflow: V?) > > > > > > When the overflow value is not null, it's sent downstream first after > the > > > current value. In each case, the current result is stored in the > > statestore > > > for later retrieval like the following: > > > > > > class AggregateOverflow<K, V, VR : Any>( > > > private val storeName: String, > > > private val transformation: (K, V, VR?) -> > > AggregateOverflowResult<VR>?) : > > > ValueTransformerWithKey<K, V, Iterable<VR>> { > > > private val logger = KotlinLogging.logger{} > > > private lateinit var state: KeyValueStore<K, VR> > > > > > > init { > > > logger.debug { "$storeName: created" } > > > } > > > > > > override fun init(context: ProcessorContext) { > > > logger.debug { "$storeName: init called" } > > > this.state = context.getStateStore(storeName) as KeyValueStore<K, > VR>; > > > } > > > > > > override fun transform(key: K, value: V): Iterable<VR> { > > > val acc = state.get(key) > > > if (acc == null) logger.debug { "$storeName: Found empty value for > > $key" > > > } > > > val result = transformation(key, value, acc) > > > state.put(key, result?.current) > > > logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate > > > old: $acc\n aggregate new: $result" } > > > return listOfNotNull(result?.overflow, result?.current) //prevAcc > will > > > be forwarded first if not null > > > } > > > > > > override fun close() { > > > logger.debug { "$storeName: close called" } > > > } > > > } > > > > > > In the log file you can see that the first invocation is returning an > > empty > > > value for the given key, you can also see that the new value is being > > > serialized in the store. > > > At the second invocation a few seconds later, the value for the same > key > > is > > > still null. > > > > > > Any idea's why this is? > > > Best regards > > > Jan > > > > > > > >