[
https://issues.apache.org/jira/browse/KAFKA-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154293#comment-17154293
]
Gordon Wang commented on KAFKA-10252:
-------------------------------------
Updated patch for fix.
> MeteredTimestampedKeyValueStore not setting up correct topic name for
> GlobalKTable associating StateStores
> ----------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10252
> URL: https://issues.apache.org/jira/browse/KAFKA-10252
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.1
> Reporter: Gordon Wang
> Priority: Major
> Attachments:
> 0001-KAFKA-10252-Use-source-topic-name-for-MeteredTimesta.patch
>
>
> When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
> {code}
> GlobalKTable<GenericRecord, GenericRecord> globalTable =
> streamsBuilder.globalTable(topic,
> Consumed.with(keySerde, valueSerde),
> Materialized.as(Stores.inMemoryKeyValueStore(topic)));
> {code}
> I got StreamsException like below:
> {code}
> org.apache.kafka.common.errors.SerializationException: Error retrieving Avro
> schema for topic applicationId-sourceTopicName-changelog
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> Subject not found.; error code: 40401
> {code}
> But as seen in GlobalKTable Java Doc the changelog stream shall not be
> created and in fact was not created.This leads to our custom serde to be
> searching for schema (we are using Confluent Platform and Avro based schema
> registry for the job) using a wrong topic name (should just be
> sourceTopicName rather than applicationId-sourceTopicName-changelog).
> After digging into the code, I found *initStoreSerde* method in
> *MeteredTimestampedKeyValueStore* would assume the topic backing the store
> would always be storeChangelogTopic when initializing the Serdes for the
> state store, I think for GlobalKTables (ones having a
> GlobalProcessorContextImpl ProcessorContext) we shall use the original topic
> name directly here.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)