Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread Yun Tang
Hi

The root cause is checkpoint error due to fail to send data to kafka during 
'preCommit'. The right solution is avoid to send data to kafka unsuccessfully 
which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for 
exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending 
data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the 
tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang

From: jose farfan 
Sent: Wednesday, January 15, 2020 23:21
To: ouywl 
Cc: user ; user...@flink.apache.org 

Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The 
streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl mailto:ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as 
log-1,code is::

input.addSink(
new FlinkKafkaProducer(
parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("output-topic"),
new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task f643244ff791dbd3fbfb88bfafdf1872 of job 
d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ 
producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was 
declined.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 
ms has passed since batch creation
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)
at 

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread Yun Tang
Hi

The root cause is checkpoint error due to fail to send data to kafka during 
'preCommit'. The right solution is avoid to send data to kafka unsuccessfully 
which might be scope of Kafka.

If you cannot ensure the status of kafka with its client and no request for 
exactly once, you can pass FlinkKafkaProducer.Semantic.NONE to disable sending 
data during 'preCommit' when creating the kafka producer.

If you don't want job failed due to checkpoint error, you can increase the 
tolerableDeclinedCheckpointNumber:

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);

Best
Yun Tang

From: jose farfan 
Sent: Wednesday, January 15, 2020 23:21
To: ouywl 
Cc: user ; user-zh@flink.apache.org 

Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The 
streamTask checkpoint error .

Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl mailto:ou...@139.com>> wrote:
Hi all:
When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as 
log-1,code is::

input.addSink(
new FlinkKafkaProducer(
parameterTool.getRequired("bootstrap.servers"),
parameterTool.getRequired("output-topic"),
new KafkaEventDeSchema()));

Log-1:
2020-01-09 09:13:44,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.
2020-01-09 09:15:33,069 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task f643244ff791dbd3fbfb88bfafdf1872 of job 
d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ 
producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).
2020-01-09 09:15:33,070 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was 
declined.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 
ms has passed since batch creation
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)
at 

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 Thread jose farfan
Hi

I have the same issue.

BR
Jose

On Thu, 9 Jan 2020 at 10:28, ouywl  wrote:

> Hi all:
> When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was
> happen as* log-1,code is::*
>
> input.addSink(
> new FlinkKafkaProducer(
> parameterTool.getRequired("bootstrap.servers"),
> parameterTool.getRequired("output-topic"),
> new KafkaEventDeSchema()));
>
>
> *Log-1:*
> 2020-01-09 09:13:44,476 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> 2020-01-09 09:15:33,069 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Decline checkpoint 1 by task
> f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400
> at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j
> (dataPort=33361).
> 2020-01-09 09:15:33,070 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Discarding checkpoint 1 of job
> d8827b3f4165b6ba27c8b59c7aa1a400.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason:
> Checkpoint was declined.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:431)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask
> .java:1282)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:
> 1216)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .checkpointState(StreamTask.java:872)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:777)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:708)
> at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
> .notifyCheckpoint(CheckpointBarrierHandler.java:88)
> at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner
> .processBarrier(CheckpointBarrierAligner.java:113)
> at org.apache.flink.streaming.runtime.io.CheckpointedInputGate
> .pollNext(CheckpointedInputGate.java:155)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:102)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .pollNextNullable(StreamTaskNetworkInput.java:47)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:135)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:
> 120018 ms has passed since batch creation
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .checkErroneous(FlinkKafkaProducer.java:1196)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .flush(FlinkKafkaProducer.java:968)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:892)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .preCommit(FlinkKafkaProducer.java:98)
> at org.apache.flink.streaming.api.functions.sink.
> TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:
> 311)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> .snapshotState(FlinkKafkaProducer.java:973)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> .snapshotFunctionState(StreamingFunctionUtils.java:99)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> .snapshotState(AbstractUdfStreamOperator.java:90)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:399)
> ... 17 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58
> record(s) for k8s-test-data-0:120018 ms has passed since batch creation
> 2020-01-09 09:15:33,074 INFO org.apache.flink.runtime.executiongraph.
> ExecutionGraph - Job producer data frequece
> (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.
> org.apache.flink.util.FlinkRuntimeException: Exceeded 

When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-09 Thread ouywl







Hi all:  When I use flink 1.9.1 and produce data to Kafka 1.1.1. the error was happen as log-1,code is::input.addSink(new FlinkKafkaProducer(parameterTool.getRequired("bootstrap.servers"),parameterTool.getRequired("output-topic"),new KafkaEventDeSchema()));Log-1:2020-01-09 09:13:44,476 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1578561224466 for job d8827b3f4165b6ba27c8b59c7aa1a400.2020-01-09 09:15:33,069 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task f643244ff791dbd3fbfb88bfafdf1872 of job d8827b3f4165b6ba27c8b59c7aa1a400 at ee8e6d8e92f9a59f578b1de2edd73537 @ producedata-taskmanager-d59d5cb7c-pv27j (dataPort=33361).2020-01-09 09:15:33,070 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job d8827b3f4165b6ba27c8b59c7aa1a400.org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:113)at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creationat org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)... 17 moreCaused by: org.apache.kafka.common.errors.TimeoutException: Expiring 58 record(s) for k8s-test-data-0:120018 ms has passed since batch creation2020-01-09 09:15:33,074 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job producer data frequece (d8827b3f4165b6ba27c8b59c7aa1a400) switched from state RUNNING to FAILING.org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)at