Hi Jary,
Flink removed Kafka 0.10 & 0.11 connector since 1.12, because Kafka supports
bidirectional compatibility since version 0.10, which means you can use a newer
version client to communicate with your old version broker (e.g. Kafka client
2.4.1 & Kafka broker 0.11) [1]. You can try to switch to a higher version Kafka
client and it should work.
[1] https://kafka.apache.org/protocol.html#protocol_compatibility
--
Best Regards,
Qingsheng Ren
Email: renqs...@gmail.com
On Oct 20, 2021, 11:18 AM +0800, Jary Zhen , wrote:
> Hi, everyone
>
> I'm using Flink 1.14 to consume Kafka data, which version is 0.11. And there
> are some errors while running.
> > quote_type
> > Caused by: java.lang.NoSuchMethodError:
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
> > at
> > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:113)
> > at
> > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> > at
> > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> > at
> > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> After Checking the Flink-connector-kafka code.
> consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT))
> It seems the current Flink version doesn't support the low Kafka version.
> Which use poll( long timeout ) not poll(Duration timeout)
> public ConsumerRecords poll(long timeout)
> So. is this a bug or The Flink user must use high Kafka version.