Hi,

I have a simple python pipeline running on top of flink runner,
```
pipeline | ReadFromKafka() | beam.Map(lambda args: logging.info(args[1]))
```
However, I found that it always put 0 checkpoint into s3, and there's no
error log for why the checkpoint data length is 0. Wondering if anyone
knows how I can fix this issue?  Thanks!

I am currently using flink 1.14.5 and beam 2.41.0, and here's the log:
```
2022-10-05 17:05:32,027 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Impulse
-> [3]Reading message from
kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)/{ParDo(Out
2022-10-05 17:19:34,272 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Triggering cancel-with-savepoint for job
1a68f74acf0ccf403693e2f228fa62a6. 2022-10-05 17:19:34,285 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 1 (type=SAVEPOINT) @ 1664990374275 for job
1a68f74acf0ccf403693e2f228fa62a6. 2022-10-05 17:19:34,287 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Making directory:
s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
2022-10-05 17:19:34,287 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - op_mkdirs += 1 -> 3 2022-10-05 17:19:34,288 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - op_get_file_status += 1
-> 6 2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
[] - Getting path status for
s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
(flink/expansi 2022-10-05 17:19:34,288 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus
s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - object_metadata_requests += 1 -> 6 2022-10-05 17:19:34,381 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests +=
1 -> 6 2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
[] - Not Found:
s3a://affirm-stage-chrono/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - op_get_file_status += 1 -> 7 2022-10-05 17:19:34,411 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Getting path status for
s3a://test-bucket/flink/expansion-service-example/savepoints
(flink/expansion-service-example/savepoints) 2022-10-05 17:19:34,411 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus
s3a://test-bucket/flink/expansion-service-example/savepoints 2022-10-05
17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] -
object_metadata_requests += 1 -> 7 2022-10-05 17:19:34,431 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests +=
1 -> 7 2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
[] - Found path as directory (with /) 2022-10-05 17:19:34,461 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Prefix count = 0; object
count=1 2022-10-05 17:19:34,461 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Summary:
flink/expansion-service-example/savepoints/ 0 2022-10-05 17:19:34,461 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT 0 bytes to
flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/
2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] -
PUT start 0 bytes 2022-10-05 17:19:34,461 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_put_requests += 1
-> 3 2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
[] - PUT completed success=true; 0 bytes 2022-10-05 17:19:34,527 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] -
object_put_requests_completed += 1 -> 3 2022-10-05 17:19:34,527 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Finished write to
flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/,
len 0 2022-10-05 17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem
[] - To delete unnecessary fake directory
flink/expansion-service-example/savepoints/ for
s3a://affirm-stage-chrono/flink/expansion-service-exa 2022-10-05
17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete
unnecessary fake directory flink/expansion-service-example/ for
s3a://affirm-stage-chrono/flink/expansion-service-example 2022-10-05
17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete
unnecessary fake directory flink/ for s3a://affirm-stage-chrono/flink
2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - object_delete_requests += 1 -> 3 2022-10-05 17:19:34,617 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem.Progress [] - PUT
flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/:
0 bytes 2022-10-05 17:24:34,285 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 1
of job 1a68f74acf0ccf403693e2f228fa62a6 expired before completing.
2022-10-05 17:24:34,287 WARN
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
trigger or complete checkpoint 1 for job 1a68f74acf0ccf403693e2f228fa62a6.
(0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing. at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
[flink-dist_2.12-1.14.5.jar:1.14.5] at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_292] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_292] at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_292] at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_292] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_292] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_292] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - op_delete += 1 -> 1 2022-10-05 17:24:34,288 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - op_get_file_status += 1
-> 8 2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
[] - Getting path status for
s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
(flink/expansi 2022-10-05 17:24:34,288 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus
s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - object_metadata_requests += 1 -> 8 2022-10-05 17:24:34,360 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests +=
1 -> 8 2022-10-05 17:24:34,394 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
[] - Found path as directory (with /) 2022-10-05 17:24:34,395 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Prefix count = 0; object
count=1 2022-10-05 17:24:34,395 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Summary:
flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/ 0
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] -
Delete path
s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
- recursive true 2022-10-05 17:24:34,395 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - delete: Path is a directory:
s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] -
Deleting fake empty directory
flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - object_delete_requests += 1 -> 4 2022-10-05 17:24:34,427 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus
s3a://test-bucket/flink/expansion-service-example/savepoints 2022-10-05
17:24:34,427 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] -
object_metadata_requests += 1 -> 9 2022-10-05 17:24:34,444 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests +=
1 -> 9 2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
[] - Not Found:
s3a://test-bucket/flink/expansion-service-example/savepoints 2022-10-05
17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Creating new
fake directory at
s3a://test-bucket/flink/expansion-service-example/savepoints 2022-10-05
17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT 0 bytes
to flink/expansion-service-example/savepoints/ 2022-10-05 17:24:34,482
DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT start 0 bytes
2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - object_put_requests += 1 -> 4 2022-10-05 17:24:34,548 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT completed success=true; 0
bytes 2022-10-05 17:24:34,548 DEBUG
org.apache.hadoop.fs.s3a.S3AStorageStatistics [] -
object_put_requests_completed += 1 -> 4 2022-10-05 17:24:34,548 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem [] - Finished write to
flink/expansion-service-example/savepoints/, len 0 2022-10-05 17:24:34,548
TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete unnecessary
fake directory flink/expansion-service-example/ for
s3a://affirm-stage-chrono/flink/expansion-service-example 2022-10-05
17:24:34,548 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete
unnecessary fake directory flink/ for s3a://affirm-stage-chrono/flink
2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics
[] - object_delete_requests += 1 -> 5 2022-10-05 17:24:34,597 DEBUG
org.apache.hadoop.fs.s3a.S3AFileSystem.Progress [] - PUT
flink/expansion-service-example/savepoints/: 0 bytes
```



Sincerely,
Lydian Lee

Reply via email to