[
https://issues.apache.org/jira/browse/KAFKA-9732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064989#comment-17064989
]
John Roesler commented on KAFKA-9732:
-------------------------------------
Hey [~abellemare] ,
I think the mapValues is where you're losing the serde. The DSL code can't know
whether you change the type in mapValues or not, so it cannot know if the value
serde is still applicable, therefore, it has to set it back to `null`. You can
give the mapValues a Materialized argument that only specifies the value serde,
and it won't actually allocate a state store. (For this case, "Materialized" is
a misnomer).
> Kafka Foreign-Key Joiner has unexpected default value used when a table is
> created via a stream+groupByKey+reduce
> -----------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-9732
> URL: https://issues.apache.org/jira/browse/KAFKA-9732
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.1
> Reporter: Adam Bellemare
> Priority: Major
>
> I'm upgrading some internal business code that used to use a prototype
> version of the FKJoiner, migrating to the 2.4.1 Kafka release. I am running
> into an issue where the joiner is using the default Serde, despite me clearly
> specifying NOT to use the default serde (unless I am missing something!).
> Currently, this is how I generate the left KTable, used in the
> _*leftTable.join(rightTable, ...)*_ FKJoin.
> Let's call this process 1:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)
> //For wrapping nulls in mapValues below
> case class OptionalDeletable[T](elem: Option[T])
> //Internal Serdes that do NOT use the SR
> //Same serde logic as externalMyKeySerde, but doesn't register schemas to
> schema registry.
> val internalMyKeySerde = ...
> //Same serde logic as externalMyValueSerde, but doesn't register schemas to
> schema registry.
> val internalOptionalDeletableMyValueSerde: Serde[OptionalDeletable[MyValue]]
> = ...
> val myLeftTable: KTable[MyKey, MyValue] =
> streamBuilder.stream[MyKey, MyValue]("inputTopic")(myConsumer)
> .mapValues(
> v => {
> //We need the nulls to propagate deletes.
> //Wrap this in a simple case-class because we can't
> groupByKey+reduce null values as they otherwise get filtered out.
> OptionalDeletable(Some(v))
> }
> )
> .groupByKey(Grouped.`with`(internalMyKeySerde,
> internalOptionalDeletableMyValueSerde))
> .reduce((_,x) => x)(
> Materialized.as("myLeftTable")(internalMyKeySerde,
> internalOptionalDeletableMyValueSerde))
> .mapValues(v => v.elem.get) //Unwrap the element
> {code}
> Next, we create the right table and specify the FKjoining logic
> {code:scala}
> //This is created in an identical way to Process 1... I wont show it here for
> brevity.
> val rightTable: KTable[RightTableKey, RightTableValue] =
> streamBuilder.table(...)
> //Not showing previous definitions because I don't think they're relevant to
> this issue...
> val itemMaterialized =
> Materialized.as[MyKey, JoinedOutput, KeyValueStore[Bytes,
> Array[Byte]]]("materializedOutputTable")(
> internalMyKeySerde, internalJoinedOutputSerde)
> val joinedTable = myLeftTable.join[JoinedOutput, RightTableKey,
> RightTableValue](
> rightTable, foreignKeyExtractor, joinerFunction,
> materializedOutputTable)
> //Force evaluation to output some data
> joinedTable.toStream.to("outputStream")
> {code}
> When I execute this with leftTable generated via process 1, I end up somehow
> losing the leftTable serde along the way and end up falling back onto the
> default serde. This results in a runtime exception as follows:
> {code:java}
> <removed for brevity>
> Caused by: java.lang.ClassCastException: com.bellemare.sample.MyValue cannot
> be cast to [B
> at
> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
> at
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:94)
> at
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> ... 30 more
> {code}
> Now, if I change process 1 to the following:
> Process 2:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)
> val myLeftTable: KTable[MyKey, MyValue] =
> streamBuilder.table[MyKey, MyValue]("inputTopic")(myConsumer)
> //The downside of this approach is that we end up registering a bunch of
> internal topics to the schema registry (S.R.), significantly increasing the
> clutter in our lookup UI.
> {code}
> Everything works as expected, and the expected `_*externalMyValueSerde*_` is
> used to serialize the events (though I don't want this, as it registers to
> the SR and clutters it up).
> I don't think I'm missing any Serdes inputs anywhere in the DSL, but I'm
> having a hard time figuring out *if this is normal existing behaviour for how
> a KTable is created via* *Process 1* or if I'm stumbling upon a bug
> somewhere. When I try to debug my way through this, the FKJoiner appears to
> use `_*valSerde = null*_` (and therefore fall back to the default Serde) for
> the KTable created via process 1. This is unexpected to me, I was expected to
> see `_*valSerde = internalOptionalDeletableMyValueSerde*_` instead.
> Is this a bug, or is this a problem with something that I am doing
> unwittingly?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)