[ 
https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168231#comment-17168231
 ] 

Sophie Blee-Goldman commented on KAFKA-10322:
---------------------------------------------

Ok, I dug into this a big further and found that the changelogging layer will 
actually _always_ insert a sequence number, even when it's not necessary (ie 
retainDuplicates is false). On restoration this extra sequence number is 
dropped, so the correct bytes are ultimately inserted into the store. 

While this is obviously a bug in that we're storing an extra 4 bytes in the 
changelog, it seems like the key extraction should technically work on 
restoration. Did you hit an exception or other error after a restore, or did 
you just notice that something was wrong while looking at the source? 

> InMemoryWindowStore restore keys format incompatibility (lack of 
> sequenceNumber in keys on topic)
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10322
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10322
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>         Environment: windows/linux
>            Reporter: Tomasz Bradło
>            Priority: Major
>
> I have regular groupBy&Counting stream configuration:
> {code:java}
>    
>     fun addStream(kStreamBuilder: StreamsBuilder) {
>         val storeSupplier = Stores.inMemoryWindowStore("count-store",
>                 Duration.ofDays(10),
>                 Duration.ofDays(1),
>                 false)
>         val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = 
> Stores
>                 .windowStoreBuilder(storeSupplier, 
> JsonSerde(CountableEvent::class.java), Serdes.Long())
>         kStreamBuilder
>                 .stream("input-topic", Consumed.with(Serdes.String(), 
> Serdes.String()))
>                 .map {_, jsonRepresentation -> 
> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
>                 .groupByKey()
>                 .windowedBy(TimeWindows.of(Duration.ofDays(1)))
>                 
> .count(Materialized.with(JsonSerde(CountableEvent::class.java), 
> Serdes.Long()))
>                 .toStream()
>                 .to("topic1-count")
>         val storeConsumed = 
> Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
>  Duration.ofDays(1).toMillis()), Serdes.Long())
>         kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
> storeConsumed, passThroughProcessorSupplier)
>     }{code}
> While sending to "topic1-count", for serializing the key 
> [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
>  is used which is using 
> [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
>  so the message key format is:
> {code:java}
> real_grouping_key + timestamp(8bytes){code}
>  
> Everything works. I can get correct values from state-store. But, in recovery 
> scenario, when [GlobalStateManagerImpl 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
>  offset < highWatermark loop then
> [InMemoryWindowStore stateRestoreCallback 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
>  from "topic1-count" and fails to extract valid key and timestamp using 
> [WindowKeySchema.extractStoreKeyBytes 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
>  [WindowKeySchema.extractStoreTimestamp. 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
>  fails because it expects format:
> {code:java}
> real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
> How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to