Hi Bobby, This is most likely a bug in Flink. Thanks a lot for reporting the issue and analyzing it. I have created an issue for tracking it [1].
cc Becket. [1] https://issues.apache.org/jira/browse/FLINK-21691 Cheers, Till On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard <bobby.rich...@broadcom.com> wrote: > I'm receiving the following exception when trying to use a KafkaSource > from the new DataSource API. > > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > > Here is my code (kotlin) > > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > > private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> { > val builder = KafkaSource.builder<String>() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > > return builder.build() > } > > > > > I'm setting the deserializer using the ValueDeserializerWrapper as > described in the KafkaSourceBuilder javadoc example > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html > > Looking at the code for the ValueDeserializerWrapper, it appears that the > deserializer isn't actually set until the deserialize method is called, but > getProducedType is actually called first resulting in the > NullPointerException. What am I missing? > > Thanks, > Bobby > > This electronic communication and the information and any files > transmitted with it, or attached to it, are confidential and are intended > solely for the use of the individual or entity to whom it is addressed and > may contain information that is confidential, legally privileged, protected > by privacy laws, or otherwise restricted from disclosure to anyone else. If > you are not the intended recipient or the person responsible for delivering > the e-mail to the intended recipient, you are hereby notified that any use, > copying, distributing, dissemination, forwarding, printing, or copying of > this e-mail is strictly prohibited. If you received this e-mail in error, > please return the e-mail to the sender, delete it from your computer, and > destroy any printed copy of it.