Hello kafka community, When the following kafka-streams starts with input topic values in avro format, we get this NPE below. The input is a record and a field of it is an array of other records. Reading the stack trace below what I understand is that at some point in deserializing a value structure it encounters an unexpected null value and hence the NPE. Do you have any hints as to what may be the problem? In this kafka-streams ETL job we emit multiple messages from a single input message (flatMapping the array field to the output). Thank you Exception in thread “global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1" java.lang.NullPointerException at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430) at com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381) at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16) at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22) at com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87) at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54) at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27) at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465) at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135) at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100) at org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487) at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:103) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:428) at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) Thank you, Nicolae
-- Thank you, Nicolae Marasoiu Scala Engineer Orion, OVO Group