[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168374#comment-17168374 ]
Sophie Blee-Goldman commented on KAFKA-10322: --------------------------------------------- Oh you're referring to the global store, not the aggregation store. I overlooked that earlier so thanks for clarifying. In that case the issue I noted above would explain your observations, and is definitely a bug in Streams and not a misconfiguration on your part. For some context, the sequence number (or 'seqnum') is wrapped up with the window start time and key in order to allow for storing duplicates, a requirement for stream-stream joins. There's a `retainDuplicates` flag that's supposed to indicate whether the store has this requirement and thus needs to encode a seqnum, but it's ignored by ChangeLoggingWindowBytesStore. It seems like this problem has a pretty wide scope: # You can't create an optimized source KTable from a windowed input topic, because the source topic will be re-used as the store's changelog. You can work around this issue by disabling the optimization and creating a new changelog # You can't create a global store from a windowed input topic at all, because the source topic is always the changelog # We're storing an extra 4 bytes per record 3 is obnoxious, but 1 & 2 are serious bugs and should be addressed ASAP. To steal [~vvcephei]'s idea for fixing some issues with the suppression buffer in the past, we can use record headers to store a schema version. Then we can just remove the extra seqnum and decode the bytes during restoration based on the version number. It might be kind of tricky to do this in a way that supports downgrades as well. But maybe we can use the UPGRADE_FROM config to at least support rolling upgrades. We need to avoid writing v1 changelogs until all instances are on the new version, which is pretty much exactly what that config is for > InMemoryWindowStore restore keys format incompatibility (lack of > sequenceNumber in keys on topic) > ------------------------------------------------------------------------------------------------- > > Key: KAFKA-10322 > URL: https://issues.apache.org/jira/browse/KAFKA-10322 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0 > Environment: windows/linux > Reporter: Tomasz Bradło > Priority: Major > > I have regular groupBy&Counting stream configuration: > {code:java} > > fun addStream(kStreamBuilder: StreamsBuilder) { > val storeSupplier = Stores.inMemoryWindowStore("count-store", > Duration.ofDays(10), > Duration.ofDays(1), > false) > val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = > Stores > .windowStoreBuilder(storeSupplier, > JsonSerde(CountableEvent::class.java), Serdes.Long()) > kStreamBuilder > .stream("input-topic", Consumed.with(Serdes.String(), > Serdes.String())) > .map {_, jsonRepresentation -> > KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofDays(1))) > > .count(Materialized.with(JsonSerde(CountableEvent::class.java), > Serdes.Long())) > .toStream() > .to("topic1-count") > val storeConsumed = > Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), > Duration.ofDays(1).toMillis()), Serdes.Long()) > kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", > storeConsumed, passThroughProcessorSupplier) > }{code} > While sending to "topic1-count", for serializing the key > [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] > is used which is using > [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] > so the message key format is: > {code:java} > real_grouping_key + timestamp(8bytes){code} > > Everything works. I can get correct values from state-store. But, in recovery > scenario, when [GlobalStateManagerImpl > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters > offset < highWatermark loop then > [InMemoryWindowStore stateRestoreCallback > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads > from "topic1-count" and fails to extract valid key and timestamp using > [WindowKeySchema.extractStoreKeyBytes > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and > [WindowKeySchema.extractStoreTimestamp. > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It > fails because it expects format: > {code:java} > real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} > How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)