Hi Bobby,

This is most likely a bug in Flink. Thanks a lot for reporting the issue
and analyzing it. I have created an issue for tracking it [1].

cc Becket.

[1] https://issues.apache.org/jira/browse/FLINK-21691

Cheers,
Till

On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard <bobby.rich...@broadcom.com>
wrote:

> I'm receiving the following exception when trying to use a KafkaSource
> from the new DataSource API.
>
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
>
> Here is my code (kotlin)
>
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
>
> private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
>     val builder = KafkaSource.builder<String>()
>         .setBootstrapServers(params.get("bootstrapServers"))
>         .setGroupId(params.get("groupId"))
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setTopics("topic")
>         
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
>
>     if (params.getBoolean("boundedSource", false)) {
>         builder.setBounded(OffsetsInitializer.latest())
>     }
>
>     return builder.build()
> }
>
>
>
>
> I'm setting the deserializer using the ValueDeserializerWrapper as
> described in the KafkaSourceBuilder javadoc example
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html
>
> Looking at the code for the ValueDeserializerWrapper, it appears that the
> deserializer isn't actually set until the deserialize method is called, but
> getProducedType is actually called first resulting in the
> NullPointerException. What am I missing?
>
> Thanks,
> Bobby
>
> This electronic communication and the information and any files
> transmitted with it, or attached to it, are confidential and are intended
> solely for the use of the individual or entity to whom it is addressed and
> may contain information that is confidential, legally privileged, protected
> by privacy laws, or otherwise restricted from disclosure to anyone else. If
> you are not the intended recipient or the person responsible for delivering
> the e-mail to the intended recipient, you are hereby notified that any use,
> copying, distributing, dissemination, forwarding, printing, or copying of
> this e-mail is strictly prohibited. If you received this e-mail in error,
> please return the e-mail to the sender, delete it from your computer, and
> destroy any printed copy of it.

Reply via email to