Thanks John R. I am adding John Brackin to this thread, who can provide further details of topology description
On Tue, Dec 6, 2022 at 8:28 AM John Roesler <vvcep...@apache.org> wrote: > Hi Pushkar, > > I'm sorry for the delay. I'm afraid I'm having trouble picturing the > situation. Can you provide the topology description? That will show us > whether we should expect the stores to always be in the same instances or > not. If you can also include a simplified version of your program, we might > be able to provide some suggestions. > > Thanks, > -John > > On Mon, Dec 5, 2022, at 10:52, Pushkar Deole wrote: > > John or Matthias > > > > can you help here, we are frequently getting errors like below: > > > > org.apache.kafka.streams.errors.InvalidStateStoreException: The state > > store, records, may have migrated to another instance. > > > > For the same key, the record exist in totals state store but not in > > 'records' state store. > > > > John, > > > > can you provide more details on the groupBy option? > > > > On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole <pdeole2...@gmail.com> > wrote: > > > >> Hi John, > >> > >> I am not sure I understood it correctly, even with branching that uses a > >> different state store, the key of incoming event is still the same, so > we > >> expect it to land in the local state store on the same pod. > >> e.g. an event with OPEN status, with key xyz came in and processed > through > >> one branch and it is stored in state store 'totals', state maintained on > >> local state store on same pod > >> 2nd event with OPEN status, with key xyz came in and again processed and > >> stored in 'totals'. State maintained on local state store on same pod > >> > >> 3rd event with CLOSED status, with key xyz came in and processed. The > >> state is stored in 'record' state store, it is expected to be stored in > >> state store on same pod. > >> Why it would go to some other pod? > >> > >> On Wed, Nov 23, 2022 at 8:50 PM John Roesler <vvcep...@apache.org> > wrote: > >> > >>> Hi Pushkar, > >>> > >>> Thanks for the question. I think that what’s happening is that, even > >>> though both branches use the same grouping logic, Streams can’t detect > that > >>> they are the same. It just sees two group-bys and therefore introduces > two > >>> repartitions, with a separate downstream task for each. > >>> > >>> You might want to print out the topology description and visualize it > >>> with https://zz85.github.io/kafka-streams-viz/ . That will show you > >>> whether the stores wind up in the same task or not. > >>> > >>> The visualization will also show you the names of the input topics for > >>> those two partitions, which you can use in conjunction with the > metadata > >>> methods on your KafkaStreams instance to query for the location of the > keys > >>> in both stores. > >>> > >>> I suspect that with some tweaks you can re-write the topology to just > >>> have one downstream task, if that’s what you prefer. > >>> > >>> By the way, I think you could propose to add an optimization to make > the > >>> groupBy behave the way you expected. If that’s interesting to you, let > us > >>> know and we can give you some pointers! > >>> > >>> I hope this helps, > >>> John > >>> > >>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote: > >>> > Hi All, > >>> > > >>> > I have a stream application that creates 2 branches. Each branch > >>> includes > >>> > a state store where the status field of the kafka message determines > the > >>> > branch, and therefore the state store used: > >>> > > >>> > Status OPEN = State store name totals > >>> > > >>> > Status CLOSED = State store name records > >>> > > >>> > > >>> > > >>> > I’m seeing that the streams application is running on a pod; however > I’m > >>> > getting the exception: > >>> > > >>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state > >>> > store, records, may have migrated to another instance. > >>> > > >>> > > >>> > > >>> > If I physically access the pod and check the Rocksdb folders I do not > >>> see > >>> > the state store folder. If I check the keys in the totals state > store > >>> on > >>> > this pod, I can find the key in the records state store on another > pod. > >>> I > >>> > had assumed that because the key of the events are the same, the same > >>> > partition would be used for the two branches and therefore the same > >>> keys in > >>> > these two state store would be created on the same Kubernetes pod. > >>> This is > >>> > not an issue for the Kafka stream, but that assumption was used in > the > >>> way > >>> > the state stores are read. I assumed if I found the key in the > 'totals' > >>> > state store, the same key would be found on the same pod in the > >>> 'records' > >>> > state store. > >>> > > >>> > > >>> > > >>> > The questions I have are: > >>> > > >>> > 1) Is it expected that the state stores can hold the partition data > on > >>> > different pods, and is this unique to streams using branch? > >>> > > >>> > 2) Is there a way to know if the state store is on the pod to avoid > >>> > handling this as an exception? > >>> > > >>> > > >>> > > >>> > Here is the topology of the stream in question: > >>> > > >>> > KStream<String, ConsolidatedIntervalTotalsModel>[] branches = > >>> stream > >>> > > >>> > .peek(receivingEventLogger) > >>> > > >>> > .selectKey(keyMapper) > >>> > > >>> > .mapValues(totalsValueMapper) > >>> > > >>> > .filter(nullKeyValueEventFilter) > >>> > > >>> > .branch((k, v) -> (RecordStatus.CLOSED.name > >>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus()) > >>> > > >>> > || RecordStatus.LB_RDELETE.name > >>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())), > >>> > > >>> > (k, v) -> true); > >>> > > >>> > > >>> > > >>> > // CLOSED and LB_RDELETE branch writes to records state store > >>> > > >>> > branches[0] > >>> > > >>> > .groupByKey(Grouped.with(Serdes.String(), > totalsValueSerde)) > >>> > > >>> > .aggregate(totalsInitializer, totalsAggregator, > >>> > materializedRecords) > >>> > > >>> > .toStream() > >>> > > >>> > .map(totalsInternalKeyValueMapper) > >>> > > >>> > .filter(nullKeyStringValueEventFilter) > >>> > > >>> > .to(loopbackTopic.name()); > >>> > > >>> > > >>> > > >>> > // DEFAULT branch writes to totals state store > >>> > > >>> > branches[1] > >>> > > >>> > .groupByKey(Grouped.with(Serdes.String(), > totalsValueSerde)) > >>> > > >>> > .aggregate(totalsInitializer, totalsAggregator, > >>> > materializedTotals) > >>> > > >>> > .toStream() > >>> > > >>> > .flatMap(totalsKeyValueMapper) > >>> > > >>> > .filter(nullKeyStringValueEventFilter) > >>> > > >>> > .peek(sendingEventLogger) > >>> > > >>> > .to(toTopic.name()); > >>> > >> >