[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168245#comment-17168245 ]
Tomasz Bradło commented on KAFKA-10322: --------------------------------------- The problem this is causing is that wrong window is determined. During restoration bytes <-12,-4> are used as timestamp which is garbage. The key is also trimmed by 4 bytes. As a result such window is skipped because it is outside configured 10days retention. I think you're talking about different mechanizm (inserting 0 as seq number in case of retain duplicates false) - I am away form code now so I'll take a closer look tomorrow. In my case [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] serialize messages without sequence-number part and puts them into "topic1-count". And such message is then picked-up by restoration lambda. I have a feeling all store related serializations use sequence_number (thats why WindowKeySchema methods using sequences have Store in their names) and changelogs does not use them. But I am not aware any such convention, this is only my guess. THis could be also something wrong with my stream. I could not find any example how to use GlobalStore so maybe it is misconfigured. I would be happy to know if messages on topic1-count are expected to have sequence-number or not. > InMemoryWindowStore restore keys format incompatibility (lack of > sequenceNumber in keys on topic) > ------------------------------------------------------------------------------------------------- > > Key: KAFKA-10322 > URL: https://issues.apache.org/jira/browse/KAFKA-10322 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0 > Environment: windows/linux > Reporter: Tomasz Bradło > Priority: Major > > I have regular groupBy&Counting stream configuration: > {code:java} > > fun addStream(kStreamBuilder: StreamsBuilder) { > val storeSupplier = Stores.inMemoryWindowStore("count-store", > Duration.ofDays(10), > Duration.ofDays(1), > false) > val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = > Stores > .windowStoreBuilder(storeSupplier, > JsonSerde(CountableEvent::class.java), Serdes.Long()) > kStreamBuilder > .stream("input-topic", Consumed.with(Serdes.String(), > Serdes.String())) > .map {_, jsonRepresentation -> > KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofDays(1))) > > .count(Materialized.with(JsonSerde(CountableEvent::class.java), > Serdes.Long())) > .toStream() > .to("topic1-count") > val storeConsumed = > Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), > Duration.ofDays(1).toMillis()), Serdes.Long()) > kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", > storeConsumed, passThroughProcessorSupplier) > }{code} > While sending to "topic1-count", for serializing the key > [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] > is used which is using > [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] > so the message key format is: > {code:java} > real_grouping_key + timestamp(8bytes){code} > > Everything works. I can get correct values from state-store. But, in recovery > scenario, when [GlobalStateManagerImpl > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters > offset < highWatermark loop then > [InMemoryWindowStore stateRestoreCallback > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads > from "topic1-count" and fails to extract valid key and timestamp using > [WindowKeySchema.extractStoreKeyBytes > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and > [WindowKeySchema.extractStoreTimestamp. > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It > fails because it expects format: > {code:java} > real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} > How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)