[ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10515:
---------------------------------
    Fix Version/s: 2.6.1

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10515
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10515
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0, 2.5.1
>            Reporter: Thorsten Hake
>            Priority: Critical
>             Fix For: 2.7.0, 2.6.1
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>       at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>       at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>       at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>       at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>       at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>       at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>       at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>       at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initialization of the task containing the sink node 
> ("subscription-registration-sink"). So if the task containing the 
> SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to 
> the same instance as the task containing the sink node, a NPE will be thrown 
> because the Serde of the state store used within the 
> SubscriptionStoreReceiveProcessor is not initialized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to