Re: InvalidTypesException: Input mismatch while consuming Kafka messages

2020-05-04 Thread Manish G
Thanks. It worked by introducing a custom DeserializationSchema. On Mon, May 4, 2020 at 3:04 PM Robert Metzger wrote: > Hi, > Can you provide the full stack trace of your exception? > Most likely, the error is caused by this setting: > > properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_C

Re: InvalidTypesException: Input mismatch while consuming Kafka messages

2020-05-04 Thread Robert Metzger
Hi, Can you provide the full stack trace of your exception? Most likely, the error is caused by this setting: properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName()); You need to use Flink's DeserializationSchema. On Mon, May 4, 2020 at 1

InvalidTypesException: Input mismatch while consuming Kafka messages

2020-05-04 Thread Manish G
I have following code: // Properties properties = new Properties(); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName()); FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer( "test-kafka=top