You can try to check the file which rocksdb creates for the state store to
see if value is stored for the key.

What I suspect is that get may be getting called before put in your
topology.

Try retrieving the value from state store further downstream.

Thanks
Sachin


On Thu, 2 Apr 2020, 02:35 Jan Bols, <janb...@telenet.be> wrote:

> Ok, Matthias,
>
> thanks for the hint:
> *Even if any upstream operation was key-changing, no auto-repartition is
> triggered. If repartitioning is required, a call to through()
> <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#through-java.lang.String-
> >
> should be performed before flatTransformValues(). *
>
> Of course, I didn't call *through* before calling the transformer. As a
> result some calls where being processed by another instance of the
> transformer running on a different partition. Calling *store.get(key)* on
> an instance would then not return any value even though another instance
> did a *store.put(key, value)* before.  Is this expected behaviour? Is there
> a transformer for each partition and does it get its own state store?
>
> Best regards
>
> Jan
>
> On Fri, Mar 27, 2020 at 12:59 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Your code looks correct to me. If you write into the store, you should
> > also be able to read it back from the store.
> >
> > Can you reproduce the issue using `TopologyTestDriver`? How many
> > partitions does your input topic have? Is your stream partitioned by
> > key? Note that `transfrom()` does not do auto-repartitioning in contrast
> > to `groupByKey()`.
> >
> >
> > -Matthias
> >
> > On 3/25/20 3:49 AM, Jan Bols wrote:
> > > Hi all,
> > > I'm trying to aggregate a stream of messages and return a stream of
> > > aggregated results using kafka streams.
> > > At some point, depending on the incoming message, the old aggregate
> needs
> > > to be closed and a new aggregate needs to be created, just like a
> session
> > > that is closed due to some close event and at the same time a new
> session
> > > is started.
> > >
> > > For this I'm using transformValues where I store the result of an
> > > aggregation similar to how a groupByKey().aggregate() is done. When the
> > old
> > > session needs to be closed, it's sent first after the new value.
> > >
> > > The state store returns null for a given key at first retrieval and the
> > new
> > > aggregation result is stored under the same key.
> > > However, at the second pass, the value for the same key is still null
> > even
> > > though it has just been stored before.
> > >
> > > How can this be possible?
> > >
> > >
> > >
> > > I'm using transformValues in the following way:
> > >
> > > val storeName = "aggregateOverflow_binReportAgg"
> > > val store = Stores.keyValueStoreBuilder<K,
> > > V>(Stores.persistentKeyValueStore(storeName), serde.serde(),
> > serde.serde())
> > > streamsBuilder.addStateStore(store)
> > >
> > > ...
> > >
> > > stream
> > >    .flatTransformValues(ValueTransformerWithKeySupplier {
> > > AggregateOverflow(storeName, transformation) }, storeName)
> > >
> > >
> > > where AggregateOverflow gets the previous value from the state store,
> > > transforms the result into a AggregateOverflowResult.
> > > AggregateOverflowResult is a data class containing the current value
> and
> > an
> > > optional overflow value like this:
> > >
> > > data class AggregateOverflowResult<V>(val current: V, val overflow: V?)
> > >
> > > When the overflow value is not null, it's sent downstream first after
> the
> > > current value. In each case, the current result is stored in the
> > statestore
> > > for later retrieval like the following:
> > >
> > > class AggregateOverflow<K, V, VR : Any>(
> > >  private val storeName: String,
> > >  private val transformation: (K, V, VR?) ->
> > AggregateOverflowResult<VR>?) :
> > > ValueTransformerWithKey<K, V, Iterable<VR>> {
> > >  private val logger = KotlinLogging.logger{}
> > >  private lateinit var state: KeyValueStore<K, VR>
> > >
> > >  init {
> > >    logger.debug { "$storeName: created" }
> > >  }
> > >
> > >  override fun init(context: ProcessorContext) {
> > >    logger.debug { "$storeName: init called" }
> > >    this.state = context.getStateStore(storeName) as KeyValueStore<K,
> VR>;
> > >  }
> > >
> > >  override fun transform(key: K, value: V): Iterable<VR> {
> > >    val acc = state.get(key)
> > >    if (acc == null) logger.debug { "$storeName: Found empty value for
> > $key"
> > > }
> > >    val result = transformation(key, value, acc)
> > >    state.put(key, result?.current)
> > >    logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
> > > old: $acc\n aggregate new: $result" }
> > >    return listOfNotNull(result?.overflow, result?.current) //prevAcc
> will
> > > be forwarded first if not null
> > >  }
> > >
> > >  override fun close() {
> > >    logger.debug { "$storeName: close called" }
> > >  }
> > > }
> > >
> > > In the log file you can see that the first invocation is returning an
> > empty
> > > value for the given key, you can also see that the new value is being
> > > serialized in the store.
> > > At the second invocation a few seconds later, the value for the same
> key
> > is
> > > still null.
> > >
> > > Any idea's why this is?
> > > Best regards
> > > Jan
> > >
> >
> >
>

Reply via email to