Thanks John R.

I am adding John Brackin to this thread, who can provide further details of
topology description

On Tue, Dec 6, 2022 at 8:28 AM John Roesler <vvcep...@apache.org> wrote:

> Hi Pushkar,
>
> I'm sorry for the delay. I'm afraid I'm having trouble picturing the
> situation. Can you provide the topology description? That will show us
> whether we should expect the stores to always be in the same instances or
> not. If you can also include a simplified version of your program, we might
> be able to provide some suggestions.
>
> Thanks,
> -John
>
> On Mon, Dec 5, 2022, at 10:52, Pushkar Deole wrote:
> > John or Matthias
> >
> > can you help here, we are frequently getting errors like below:
> >
> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> > store, records, may have migrated to another instance.
> >
> > For the same key, the record exist in totals state store but not in
> > 'records' state store.
> >
> > John,
> >
> > can you provide more details on the groupBy option?
> >
> > On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole <pdeole2...@gmail.com>
> wrote:
> >
> >> Hi John,
> >>
> >> I am not sure I understood it correctly, even with branching that uses a
> >> different state store, the key of incoming event is still the same, so
> we
> >> expect it to land in the local state store on the same pod.
> >> e.g. an event with OPEN status, with key xyz came in and processed
> through
> >> one branch and it is stored in state store 'totals', state maintained on
> >> local state store on same pod
> >> 2nd event with OPEN status, with key xyz came in and again processed and
> >> stored in 'totals'. State maintained on local state store on same pod
> >>
> >> 3rd event with CLOSED status, with key xyz came in and processed. The
> >> state is stored in 'record' state store, it is expected to be stored in
> >> state store on same pod.
> >> Why it would go to some other pod?
> >>
> >> On Wed, Nov 23, 2022 at 8:50 PM John Roesler <vvcep...@apache.org>
> wrote:
> >>
> >>> Hi Pushkar,
> >>>
> >>> Thanks for the question. I think that what’s happening is that, even
> >>> though both branches use the same grouping logic, Streams can’t detect
> that
> >>> they are the same. It just sees two group-bys and therefore introduces
> two
> >>> repartitions, with a separate downstream task for each.
> >>>
> >>> You might want to print out the topology description and visualize it
> >>> with https://zz85.github.io/kafka-streams-viz/ . That will show you
> >>> whether the stores wind up in the same task or not.
> >>>
> >>> The visualization will also show you the names of the input topics for
> >>> those two partitions, which you can use in conjunction with the
> metadata
> >>> methods on your KafkaStreams instance to query for the location of the
> keys
> >>> in both stores.
> >>>
> >>> I suspect that with some tweaks you can re-write the topology to just
> >>> have one downstream task, if that’s what you prefer.
> >>>
> >>> By the way, I think you could propose to add an optimization to make
> the
> >>> groupBy behave the way you expected. If that’s interesting to you, let
> us
> >>> know and we can give you some pointers!
> >>>
> >>> I hope this helps,
> >>> John
> >>>
> >>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
> >>> > Hi All,
> >>> >
> >>> > I have a stream application that creates 2 branches.  Each branch
> >>> includes
> >>> > a state store where the status field of the kafka message determines
> the
> >>> > branch, and therefore the state store used:
> >>> >
> >>> > Status OPEN = State store name totals
> >>> >
> >>> > Status CLOSED = State store name records
> >>> >
> >>> >
> >>> >
> >>> > I’m seeing that the streams application is running on a pod; however
> I’m
> >>> > getting the exception:
> >>> >
> >>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> >>> > store, records, may have migrated to another instance.
> >>> >
> >>> >
> >>> >
> >>> > If I physically access the pod and check the Rocksdb folders I do not
> >>> see
> >>> > the state store folder.  If I check the keys in the totals state
> store
> >>> on
> >>> > this pod, I can find the key in the records state store on another
> pod.
> >>> I
> >>> > had assumed that because the key of the events are the same, the same
> >>> > partition would be used for the two branches and therefore the same
> >>> keys in
> >>> > these two state store would be created on the same Kubernetes pod.
> >>> This is
> >>> > not an issue for the Kafka stream, but that assumption was used in
> the
> >>> way
> >>> > the state stores are read.  I assumed if I found the key in the
> 'totals'
> >>> > state store, the same key would be found on the same pod in the
> >>> 'records'
> >>> > state store.
> >>> >
> >>> >
> >>> >
> >>> > The questions I have are:
> >>> >
> >>> > 1) Is it expected that the state stores can hold the partition data
> on
> >>> > different pods, and is this unique to streams using branch?
> >>> >
> >>> > 2) Is there a way to know if the state store is on the pod to avoid
> >>> > handling this as an exception?
> >>> >
> >>> >
> >>> >
> >>> > Here is the topology of the stream in question:
> >>> >
> >>> >         KStream<String, ConsolidatedIntervalTotalsModel>[] branches =
> >>> stream
> >>> >
> >>> >             .peek(receivingEventLogger)
> >>> >
> >>> >             .selectKey(keyMapper)
> >>> >
> >>> >             .mapValues(totalsValueMapper)
> >>> >
> >>> >             .filter(nullKeyValueEventFilter)
> >>> >
> >>> >             .branch((k, v) -> (RecordStatus.CLOSED.name
> >>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())
> >>> >
> >>> >                         || RecordStatus.LB_RDELETE.name
> >>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())),
> >>> >
> >>> >                     (k, v) -> true);
> >>> >
> >>> >
> >>> >
> >>> >         // CLOSED and LB_RDELETE branch writes to records state store
> >>> >
> >>> >         branches[0]
> >>> >
> >>> >             .groupByKey(Grouped.with(Serdes.String(),
> totalsValueSerde))
> >>> >
> >>> >             .aggregate(totalsInitializer, totalsAggregator,
> >>> > materializedRecords)
> >>> >
> >>> >             .toStream()
> >>> >
> >>> >             .map(totalsInternalKeyValueMapper)
> >>> >
> >>> >             .filter(nullKeyStringValueEventFilter)
> >>> >
> >>> >             .to(loopbackTopic.name());
> >>> >
> >>> >
> >>> >
> >>> >         // DEFAULT branch writes to totals state store
> >>> >
> >>> >         branches[1]
> >>> >
> >>> >             .groupByKey(Grouped.with(Serdes.String(),
> totalsValueSerde))
> >>> >
> >>> >             .aggregate(totalsInitializer, totalsAggregator,
> >>> > materializedTotals)
> >>> >
> >>> >             .toStream()
> >>> >
> >>> >             .flatMap(totalsKeyValueMapper)
> >>> >
> >>> >             .filter(nullKeyStringValueEventFilter)
> >>> >
> >>> >             .peek(sendingEventLogger)
> >>> >
> >>> >             .to(toTopic.name());
> >>>
> >>
>

Reply via email to