Hi, Kostas (in CC) might be able to help.
Best, Fabian Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav < sidsau...@gmail.com>: > Hi, > > Can someone suggest a workaround so that we do not get this issue while > changing the S3 bucket ? > > On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav <sidsau...@gmail.com> > wrote: > >> Hi, >> >> We are trying to change our StreamingFileSink S3 bucket, say from s3:// >> *eu1/output_old* to s3://*eu2/output_new*. When we do so we get an >> exception and the taskmanger goes into a restart loop. >> >> We suspect that it tries to restore state and gets the bucketid from >> saved state [*<Buckets.java> final BucketID bucketId = >> recoveredState.getBucketId()*]. Flink then tries to read output_old from >> eu2 and gets an AccessDeniedError. Rightly so as it has permission for >> s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is >> Flink trying to access the old bucket and how to avoid this exception. >> >> Logs: >> >> > "S3Committer.java","line":"87","message":"Failed to commit after >> recovery output_old/2019-08-22/18/part-3-40134 with MPU ID >> 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7. >> Checking if file was committed before...", >> >> > "Task.java","line":"910","message":"... switched from RUNNING to >> FAILED." >> >> > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134: >> getObjectMetadata on output_old/2019-08-22/18/part-3-40134: >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: >> Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; >> Request ID: 79F1AEE53131FB66; S3 Extended Request ID: >> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=), >> S3 Extended Request ID: >> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403 >> Forbidden >> >> flink-taskmanager at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) >> flink-taskmanager at >> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126) >> flink-taskmanager at >> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) >> flink-taskmanager at >> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) >> flink-taskmanager at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >> flink-taskmanager at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils. >> *restoreFunctionState*(StreamingFunctionUtils.java:160) >> flink-taskmanager at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> flink-taskmanager at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) >> flink-taskmanager at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >> flink-taskmanager at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >> flink-taskmanager at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> flink-taskmanager at java.lang.Thread.run(Thread.java:748) >> >> >> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for >> externalized checkpoint have not been modified. >> >> Thanks >> Sidhartha >> >