[jira] [Commented] (FLINK-27041) KafkaSource in batch mode failing on 0 messages in any topic partition

2022-06-14 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553946#comment-17553946
 ] 

Leonard Xu commented on FLINK-27041:


Fixed in
master(1.16):2b89dfe0d4c5c71a958fb1f07c5e6861e4bbe6a6
release-1.15: TODO


> KafkaSource in batch mode failing on 0 messages in any topic partition
> --
>
> Key: FLINK-27041
> URL: https://issues.apache.org/jira/browse/FLINK-27041
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
> Environment: Kafka cluster version: 3.1.0
> Flink version 1.14.4
>Reporter: Terxor
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available
>
> First let's take the case of consuming from a Kafka topic with a single 
> partition having 0 messages. Execution in batch mode, with bounded offsets 
> set to latest, is expected to finish gracefully. However, it fails with an 
> exception.
> Consider this minimal working example (assume that test_topic exists with 1 
> partition and 0 messages):
> {code:java}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>   KafkaSource kafkaSource = KafkaSource
>   .builder()
>   .setBootstrapServers("localhost:9092")
>   .setTopics("test_topic")
>   .setValueOnlyDeserializer(new 
> SimpleStringSchema())
>   .setBounded(OffsetsInitializer.latest())
>   .build();
>   DataStream stream = env.fromSource(
>   kafkaSource,
>   WatermarkStrategy.noWatermarks(),
>   "Kafka Source"
>   );
>   stream.print();
>   env.execute("Flink KafkaSource test job");
> {code}
> This produces exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>   ... [omitted for readability]
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>   ... [omitted for readability]
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Thr

[jira] [Commented] (FLINK-27041) KafkaSource in batch mode failing on 0 messages in any topic partition

2022-04-07 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518720#comment-17518720
 ] 

Martijn Visser commented on FLINK-27041:


[~renqs] Done!

> KafkaSource in batch mode failing on 0 messages in any topic partition
> --
>
> Key: FLINK-27041
> URL: https://issues.apache.org/jira/browse/FLINK-27041
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
> Environment: Kafka cluster version: 3.1.0
> Flink version 1.14.4
>Reporter: Terxor
>Assignee: Qingsheng Ren
>Priority: Blocker
>
> First let's take the case of consuming from a Kafka topic with a single 
> partition having 0 messages. Execution in batch mode, with bounded offsets 
> set to latest, is expected to finish gracefully. However, it fails with an 
> exception.
> Consider this minimal working example (assume that test_topic exists with 1 
> partition and 0 messages):
> {code:java}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>   KafkaSource kafkaSource = KafkaSource
>   .builder()
>   .setBootstrapServers("localhost:9092")
>   .setTopics("test_topic")
>   .setValueOnlyDeserializer(new 
> SimpleStringSchema())
>   .setBounded(OffsetsInitializer.latest())
>   .build();
>   DataStream stream = env.fromSource(
>   kafkaSource,
>   WatermarkStrategy.noWatermarks(),
>   "Kafka Source"
>   );
>   stream.print();
>   env.execute("Flink KafkaSource test job");
> {code}
> This produces exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>   ... [omitted for readability]
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>   ... [omitted for readability]
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Threa

[jira] [Commented] (FLINK-27041) KafkaSource in batch mode failing on 0 messages in any topic partition

2022-04-07 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518687#comment-17518687
 ] 

Qingsheng Ren commented on FLINK-27041:
---

Thanks for the ticket [~terxor] !

I checked the code and this is indeed a bug. If an empty or invalid partition 
(starting offset > stopping offset) is assigned to the reader, it won't be 
added to the Kafka consumer, so it's possible that no partition is added to 
consumer if the consuming topic is empty.

[~martijnvisser] Could you assign the ticket to me? Thanks

> KafkaSource in batch mode failing on 0 messages in any topic partition
> --
>
> Key: FLINK-27041
> URL: https://issues.apache.org/jira/browse/FLINK-27041
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
> Environment: Kafka cluster version: 3.1.0
> Flink version 1.14.4
>Reporter: Terxor
>Priority: Blocker
>
> First let's take the case of consuming from a Kafka topic with a single 
> partition having 0 messages. Execution in batch mode, with bounded offsets 
> set to latest, is expected to finish gracefully. However, it fails with an 
> exception.
> Consider this minimal working example (assume that test_topic exists with 1 
> partition and 0 messages):
> {code:java}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>   KafkaSource kafkaSource = KafkaSource
>   .builder()
>   .setBootstrapServers("localhost:9092")
>   .setTopics("test_topic")
>   .setValueOnlyDeserializer(new 
> SimpleStringSchema())
>   .setBounded(OffsetsInitializer.latest())
>   .build();
>   DataStream stream = env.fromSource(
>   kafkaSource,
>   WatermarkStrategy.noWatermarks(),
>   "Kafka Source"
>   );
>   stream.print();
>   env.execute("Flink KafkaSource test job");
> {code}
> This produces exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>   ... [omitted for readability]
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>   ... [omitted for readability]
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.base/java.util.concurren

[jira] [Commented] (FLINK-27041) KafkaSource in batch mode failing on 0 messages in any topic partition

2022-04-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516766#comment-17516766
 ] 

Martijn Visser commented on FLINK-27041:


[~renqs] Can you have a look?

> KafkaSource in batch mode failing on 0 messages in any topic partition
> --
>
> Key: FLINK-27041
> URL: https://issues.apache.org/jira/browse/FLINK-27041
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
> Environment: Kafka cluster version: 3.1.0
> Flink version 1.14.4
>Reporter: Terxor
>Priority: Blocker
>
> First let's take the case of consuming from a Kafka topic with a single 
> partition having 0 messages. Execution in batch mode, with bounded offsets 
> set to latest, is expected to finish gracefully. However, it fails with an 
> exception.
> Consider this minimal working example (assume that test_topic exists with 1 
> partition and 0 messages):
> {code:java}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>   KafkaSource kafkaSource = KafkaSource
>   .builder()
>   .setBootstrapServers("localhost:9092")
>   .setTopics("test_topic")
>   .setValueOnlyDeserializer(new 
> SimpleStringSchema())
>   .setBounded(OffsetsInitializer.latest())
>   .build();
>   DataStream stream = env.fromSource(
>   kafkaSource,
>   WatermarkStrategy.noWatermarks(),
>   "Kafka Source"
>   );
>   stream.print();
>   env.execute("Flink KafkaSource test job");
> {code}
> This produces exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>   ... [omitted for readability]
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>   ... [omitted for readability]
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628