Thanks. It worked by introducing a custom DeserializationSchema.

On Mon, May 4, 2020 at 3:04 PM Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
> Can you provide the full stack trace of your exception?
> Most likely, the error is caused by this setting:
>
> properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> MyCustomClassDeserializer.class.getName());
>
> You need to use Flink's DeserializationSchema.
>
> On Mon, May 4, 2020 at 10:26 AM Manish G <manish.c.ghildi...@gmail.com>
> wrote:
>
>> I have following code:
>>
>> //////////////////////
>> Properties properties = new Properties();
>> properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> MyCustomClassDeserializer.class.getName());
>>
>> FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
>>                     "test-kafka=topic",
>>                     new SimpleStringSchema(),
>>                     properties);
>>
>> final StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream<MyCustomClass> kafkaInputStream =
>> streamEnv.addSource(kafkaConsumer);
>>
>> DataStream<String> stringStream = kafkaInputStream
>>                     .map(new MapFunction<MyCustomClass,String>() {
>>                         @Override
>>                         public String map(MyCustomClass message) {
>>                             logger.info("--- Received message : " +
>> message.toString());
>>                             return message.toString();
>>                         }
>>                     });
>>
>> streamEnv.execute("Published messages");
>>
>> ///////
>> MyCustomClassDeserializer is implemented as:
>>
>> public MyCustomClass deserialize(String s, byte[] bytes) {
>>         return (MyCustomClass) JsonUtil.convertBytesToObject(bytes,
>> MyCustomClass.class);
>>     }
>>
>> When I run this program locally, I get error:
>> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
>> Input mismatch: Basic type expected.
>>
>> Why I get this error?
>>
>

Reply via email to