Hi Harshit, I think you could double check whether the version of flink-sql-connector-kafka.jar is also 1.14.4.
Regards, Dian On Thu, Apr 14, 2022 at 7:51 PM harshit.varsh...@iktara.ai < harshit.varsh...@iktara.ai> wrote: > > > > > *From:* harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai] > *Sent:* Thursday, April 14, 2022 4:04 PM > *To:* user-i...@flink.apache.org > *Cc:* harshit.varsh...@iktara.ai > *Subject:* Pyflink Kafka consumer error (version 1.14.4) > > > > Dear Team, > > > > I am new to pyflink and request for your support in issue I am facing with > Pyflink. I am using Pyflink version 1.14.4 & using reference code from > pyflink getting started pages. > > > > I am getting following error when using FlinkKafkaConsumer connector. > > : org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed > by NoRestartBackoffTimeStrategy > > Caused by: java.lang.NoSuchMethodError: > org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/apache/flink/metrics/MetricGroup; > > > > Below is my code for reference.. > > > > def streaming_square_roots(): > > env = StreamExecutionEnvironment.get_execution_environment() > > # the sql connector for kafka is used here as it's a fat jar and could > avoid dependency issues > > env.add_jars(" > file:///C:/Users/Admin/Desktop/test11/flink-sql-connector-kafka.jar") > > > > deserialization_schema = SimpleStringSchema() > > > > kafka_consumer = FlinkKafkaConsumer( > > topics='new-numbers', > > deserialization_schema=deserialization_schema, > > properties={'bootstrap.servers': 'localhost:9092'}) > > > > ds = env.add_source(kafka_consumer) > > > > ds.print() > > > > > > # 4. create sink and emit result to sink > > > > env.execute(job_name='streaming_square_roots') > > > > > > if __name__ == '__main__': > > streaming_square_roots() > > > > Thanks and Regards, > > Harshit > > >