Hi, We are trying to change our StreamingFileSink S3 bucket, say from s3:// *eu1/output_old* to s3://*eu2/output_new*. When we do so we get an exception and the taskmanger goes into a restart loop.
We suspect that it tries to restore state and gets the bucketid from saved state [*<Buckets.java> final BucketID bucketId = recoveredState.getBucketId()*]. Flink then tries to read output_old from eu2 and gets an AccessDeniedError. Rightly so as it has permission for s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is Flink trying to access the old bucket and how to avoid this exception. Logs: > "S3Committer.java","line":"87","message":"Failed to commit after recovery output_old/2019-08-22/18/part-3-40134 with MPU ID 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. Checking if file was committed before...", > "Task.java","line":"910","message":"... switched from RUNNING to FAILED." > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134: getObjectMetadata on output_old/2019-08-22/18/part-3-40134: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), S3 Extended Request ID: 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 Forbidden flink-taskmanager at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) flink-taskmanager at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126) flink-taskmanager at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) flink-taskmanager at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) flink-taskmanager at org.apache.flink.streaming.util.functions.StreamingFunctionUtils. *restoreFunctionState*(StreamingFunctionUtils.java:160) flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) flink-taskmanager at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) flink-taskmanager at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) flink-taskmanager at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) flink-taskmanager at java.lang.Thread.run(Thread.java:748) We are using Flink 1.8 and externalized checkpoint. The S3 bucket for externalized checkpoint have not been modified. Thanks Sidhartha