[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5624 Merging this for now, because it stabilizes the tests... ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5624 Indeed, Presto-S3 does better in `com.facebook.presto.hive.PrestoS3FileSystem#create()`: ``` if ((!overwrite) && exists(path)) { throw new IOException("File already exists:" + path); } // file creation ``` But if `overwrite = false`, it will also check for existence first. Also, contrary to my initial analysis, the retries when retrieving the file status during the existence check do not cover non-existence. I can adapt the tests to only use `overwrite = true`, but actual code outside the tests makes use of both variants. It's therefore a good idea to make the distinction between `flink-s3-fs-hadoop` and `flink-s3-fs-presto` but only for the existence check, not for checking that a file/directory was deleted since > Amazon S3 offers eventual consistency for overwrite PUTS and DELETES in all regions. I adapted the code accordingly which effectively boiled down to removing some of the new eventual consistent existence checks in `PrestoS3FileSystemITCase`. Regarding the two implementations you provided: for doing the existence check, there should not be a difference between a single `fs.exists()` call vs. `fs.open()` in terms of consistency. ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5624 Okay, the solution is actually in the description of another pull request: > Amazon S3 provides read-after-write consistency for PUTS of new objects in your S3 bucket in all regions with one caveat. **The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides eventual consistency for read-after-write."** https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel That means that through the `flink-s3-fs-hadoop` connector, we can never assume any form of strongly consistent operation, not even the basic read-after-write for exact keys. That's a bummer :-( On the upside, the `flink-s3-fs-presto` does not seem to do that, so we might be able to rely on strong consistency for read-after-write. My original suggestion boiled down to changing the test code as follows, which should work if GET is consistent after PUT: ```java public static boolean pathExists( FileSystem fs, Path path, boolean expectedExists, long deadline) throws IOException { try (FsDataOutputStream ignored = fs.open(path)) { // object exists return true; } catch (FileNotFoundException e) { // object does not exist return false; } } ``` instead of the current code. ```java public static void checkPathExistsEventually( FileSystem fs, Path path, boolean expectedExists, long deadline) throws IOException, InterruptedException { boolean dirExists; while ((dirExists = fs.exists(path)) != expectedExists && System.nanoTime() < deadline) { Thread.sleep(10); } assertEquals(expectedExists, dirExists); } ``` What do you think? Should we use the "strongly consistent" variant for the Presto-S3-based connector? ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5624 That's actually the check for whether or not to overwrite the file - let me drop the whole code of this example to give some more context: ``` public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { String key = pathToKey(f); S3AFileStatus status = null; try { // get the status or throw an FNFE status = getFileStatus(f); // if the thread reaches here, there is something at the path if (status.isDirectory()) { // path references a directory: automatic error throw new FileAlreadyExistsException(f + " is a directory"); } if (!overwrite) { // path references a file and overwrite is disabled throw new FileAlreadyExistsException(f + " already exists"); } LOG.debug("Overwriting file {}", f); } catch (FileNotFoundException e) { // this means the file is not found } instrumentation.fileCreated(); FSDataOutputStream output; if (blockUploadEnabled) { output = new FSDataOutputStream( new S3ABlockOutputStream(this, key, new SemaphoredDelegatingExecutor(boundedThreadPool, blockOutputActiveBlocks, true), progress, partSize, blockFactory, instrumentation.newOutputStreamStatistics(statistics), new WriteOperationHelper(key) ), null); } else { // We pass null to FSDataOutputStream so it won't count writes that // are being buffered to a file output = new FSDataOutputStream( new S3AOutputStream(getConf(), this, key, progress ), null); } return output; } ``` At this point, however, there is the read-before-write which is causing the problem. (of course some other concurrent job could create the file in the meantime and this check is not guarding too much but we cannot do too much about it) ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5624 @NicoK Is that actually a source of problems here? Or only a check to for graceful "file already exists" message, but nothing that would get in the way when creating a new file? ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5624 @StephanEwen unfortunately not, for example: `org.apache.flink.runtime.fs.hdfs.HadoopFileSystem#create()` -> `org.apache.hadoop.fs.FileSystem#create()` -> `org.apache.hadoop.fs.s3a.S3AFileSystem#create()` and this (depending on the Hadoop version, of course) may call this: ``` // get the status or throw an FNFE status = getFileStatus(f); // if the thread reaches here, there is something at the path if (status.isDirectory()) { ... ``` ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5624 Don't we control the calls on `FileSystem` that we make as part of the YARN upload? Can't we simply avoid all operations that are not strongly consistent? For example if we strictly only use `create()` and `open()` we should be consistent. ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5624 Unfortunately, this is not entirely under our control, since we rely on the underlying `FileSystem` implementation. I can probably reduce some parts of the tests that may lead to eventual consistent operations, but looking through [S3's data consistency model](https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel), I figure we still (at least) have to deal with these scenarios: > - A process writes a new object to Amazon S3 and immediately lists keys within its bucket. Until the change is fully propagated, the object might not appear in the list. > - A process deletes an existing object and immediately lists keys within its bucket. Until the deletion is fully propagated, Amazon S3 might list the deleted object. This could, however, be handled on a case-to-case basis and I could try to improve the tests in this regard. ---
[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5624 The code and the tests should not rely on eventually consistent operations at all, but only on strongly consistent operations, like "read after create". That should also eliminate any need for retries. Minor comment: `System.currentTimeMillis()` should not be used for interval measurements. It is not a stable clock. All intervals should be measured in `System.nanoTime()`. ---