[
https://issues.apache.org/jira/browse/KAFKA-19923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041424#comment-18041424
]
sanghyeok An edited comment on KAFKA-19923 at 11/29/25 3:30 AM:
----------------------------------------------------------------
HI [~mjsax] !
I totally agree with your assessment.
*For About Breaking Change*
To be honest, since I don't have much experience contributing to Kafka yet, I
wasn't entirely sure about the strict scope of Breaking Changes in this project.
I really appreciate you clarifying that moving an existing runtime failure to a
build-time check is considered a fix/improvement rather than a breaking change.
That makes perfect sense.
*For Verification Logic*
I also agree that a class-level check is the most practical approach. However,
I’d like to highlight a couple of edge cases where {{getClass()}} might be
insufficient or tricky:
* TimestampExtractor (FunctionalInterface){*}:{*} If users define inline
lambdas, the compiler may generate different synthetic classes at runtime. A
strict {{getClass()}} check might fail even if the logic is semantically
identical, effectively enforcing users to use the exact same lambda instance.
* Generic Serdes{*}:{*} This is the critical part. A class-check cannot
distinguish between generic instances. For example:
{code:java}
@Test
void test1() {
// Different semantic types, but same Class
final Serde<Foo> fooSerde = JsonSerdes.jsonSerde(Foo.class);
final Serde<Bar> barSerde = JsonSerdes.jsonSerde(Bar.class);
assertThat(fooSerde.getClass()).equals(barSerde.getClass());
} {code}
In geneneral, KafkaStreams wrapper application use this pattern such JsonSerde
Class in Spring-Kafka.
([https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerde.java])
So, IMHO, we have two options.
* Option #1 - Strict check (==)
** Pros{*}:{*} Safest. It covers all edge cases (Generic Serdes, Lambda
Extractors).
** Cons{*}:{*} Users _must_ reuse the exact Serde/Extractor instance object.
Calling {{Serdes.String()}} twice (which creates new instances) would fail.
* Option # 2 - Relaxed Check ({{{}getClass(){}}})
** Pros{*}:{*} More user-friendly for standard cases (e.g., {{new
StringSerde()}} vs {{new StringSerde()}} works).
** Cons{*}:{*} It allows Generic mismatches (risk of {{ClassCastException}}
remains for {{JsonSerde}} users).{*}{*}
I personally think Option 2 is acceptable if we document the limitation, but
Option 1 is the way to guarantee safety. What do you think? Please let me know
your opinion.
was (Author: JIRAUSER303328):
HI [~mjsax] !
I totally agree with your assessment.
*For About Breaking Change*
To be honest, since I don't have much experience contributing to Kafka yet, I
wasn't entirely sure about the strict scope of Breaking Changes in this project.
I really appreciate you clarifying that moving an existing runtime failure to a
build-time check is considered a fix/improvement rather than a breaking change.
That makes perfect sense.
*For Verification Logic*
I also agree that a class-level check is the most practical approach. However,
I’d like to highlight a couple of edge cases where {{getClass()}} might be
insufficient or tricky:
* TimestampExtractor (FunctionalInterface){*}:{*} If users define inline
lambdas, the compiler may generate different synthetic classes at runtime. A
strict {{getClass()}} check might fail even if the logic is semantically
identical, effectively enforcing users to use the exact same lambda instance.
* Generic Serdes{*}:{*} This is the critical part. A class-check cannot
distinguish between generic instances. For example:
{code:java}
@Test
void test1() {
// Different semantic types, but same Class
final Serde<Foo> fooSerde = JsonSerdes.jsonSerde(Foo.class);
final Serde<Bar> barSerde = JsonSerdes.jsonSerde(Bar.class);
assertThat(fooSerde.getClass()).equals(barSerde.getClass());
} {code}
In geneneral, KafkaStreams wrapper application use this pattern such JsonSerde
Class in Spring-Kafka.
([https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerde.java])
So, IMHO, we have two options.
* Option #1 - Strict check (==)
** Pros{*}:{*} Safest. It covers all edge cases (Generic Serdes, Lambda
Extractors).
** Cons{*}:{*} Users _must_ reuse the exact Serde/Extractor instance object.
Calling {{Serdes.String()}} twice (which creates new instances) would fail.
* Option # 2 - Relaxed Check ({{{}getClass(){}}})
** Pros{*}:{*} More user-friendly for standard cases (e.g., {{new
StringSerde()}} vs {{new StringSerde()}} works).
** Cons{*}:{*} It allows Generic mismatches (risk of {{ClassCastException}}
remains for {{JsonSerde}} users).{*}{{*}}
I personally think Option 2 is acceptable if we document the limitation, but
Option 1 is the way to guarantee safety. What do you think? Please let me know
your opinion.
> Kafka Streams throws ClassCastException with different Consumed instances.
> --------------------------------------------------------------------------
>
> Key: KAFKA-19923
> URL: https://issues.apache.org/jira/browse/KAFKA-19923
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: sanghyeok An
> Assignee: sanghyeok An
> Priority: Minor
>
> Kafka Streams throws a ClassCastException when using different Consumed
> instances for the same topic.
> For example:
> {code:java}
> builder.stream("A", Consumed.with(Serdes.String(), Serdes.String()))
> .peek((k, v) -> System.out.println(k));
> builder.stream("A", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
> .peek((k, v) -> System.out.println(k));
> {code}
>
> Since both use the same topic name and the same ConsumedInternal
> configuration for auto offset reset, these two StreamSourceNodes are merged
> during topology building.
>
> As a result, the Topology is built successfully.
>
> However, when the StreamThread starts, the consumer begins to receive records
> from the broker, and the records flow through the pipeline, a
> ClassCastException is thrown at runtime.
>
> In my opinion, we have two options:
> # Document this behavior.
> # When merging source nodes, the builder should consider the full
> ConsumedInternal configuration (for example, key/value SerDes and timestamp
> extractor), instead of only the auto offset reset policy.
>
> I think option 1 is also acceptable, because Kafka Streams will fail fast
> with a ClassCastException before the consumer commits any offsets.
>
> Option 2 would require more substantial changes in Kafka Streams, because
> TimestampExtractor and key/value SerDes do not expose a straightforward way
> to check semantic equivalence.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)