Hi,

We are trying to change our StreamingFileSink S3 bucket, say from s3://
*eu1/output_old* to s3://*eu2/output_new*. When we do so we get an
exception and the taskmanger goes into a restart loop.

We suspect that it tries to restore state and gets the bucketid from saved
state [*<Buckets.java> final BucketID bucketId =
recoveredState.getBucketId()*]. Flink then tries to read output_old from
eu2 and gets an AccessDeniedError. Rightly so as it has permission for
s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is
Flink trying to access the old bucket and how to avoid this exception.

Logs:

> "S3Committer.java","line":"87","message":"Failed to commit after recovery
output_old/2019-08-22/18/part-3-40134 with MPU ID
7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
Checking if file was committed before...",

> "Task.java","line":"910","message":"... switched from RUNNING to FAILED."

> java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134:
getObjectMetadata on output_old/2019-08-22/18/part-3-40134:
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;
Request ID: 79F1AEE53131FB66; S3 Extended Request ID:
8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
S3 Extended Request ID:
8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
Forbidden

flink-taskmanager at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
flink-taskmanager at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
flink-taskmanager at
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
flink-taskmanager at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
flink-taskmanager at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
*restoreFunctionState*(StreamingFunctionUtils.java:160)
flink-taskmanager at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
flink-taskmanager at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
flink-taskmanager at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
flink-taskmanager at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
flink-taskmanager at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
flink-taskmanager at java.lang.Thread.run(Thread.java:748)


We are using Flink 1.8 and externalized checkpoint. The S3 bucket for
externalized checkpoint have not been modified.

Thanks
Sidhartha

Reply via email to