[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-04-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5624
  
Merging this for now, because it stabilizes the tests...


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-29 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5624
  
Indeed, Presto-S3 does better in 
`com.facebook.presto.hive.PrestoS3FileSystem#create()`:
```
if ((!overwrite) && exists(path)) {
throw new IOException("File already exists:" + path);
}
// file creation
```
But if `overwrite = false`, it will also check for existence first. Also, 
contrary to my initial analysis, the retries when retrieving the file status 
during the existence check do not cover non-existence. I can adapt the tests to 
only use `overwrite = true`, but actual code outside the tests makes use of 
both variants.

It's therefore a good idea to make the distinction between 
`flink-s3-fs-hadoop` and `flink-s3-fs-presto` but only for the existence check, 
not for checking that a file/directory was deleted since
> Amazon S3 offers eventual consistency for overwrite PUTS and DELETES in 
all regions.

I adapted the code accordingly which effectively boiled down to removing 
some of the new eventual consistent existence checks in 
`PrestoS3FileSystemITCase`.

Regarding the two implementations you provided: for doing the existence 
check, there should not be a difference between a single `fs.exists()` call vs. 
`fs.open()` in terms of consistency.


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5624
  
Okay, the solution is actually in the description of another pull request:

> Amazon S3 provides read-after-write consistency for PUTS of new objects 
in your
S3 bucket in all regions with one caveat. **The caveat is that if you make 
a HEAD
or GET request to the key name (to find if the object exists) before 
creating
the object, Amazon S3 provides eventual consistency for read-after-write."**

https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel

That means that through the `flink-s3-fs-hadoop` connector, we can never 
assume any form of strongly consistent operation, not even the basic 
read-after-write for exact keys. That's a bummer :-(

On the upside, the `flink-s3-fs-presto` does not seem to do that, so we 
might be able to rely on strong consistency for read-after-write.

My original suggestion boiled down to changing the test code as follows, 
which should work if GET is consistent after PUT:
```java
public static boolean pathExists(
FileSystem fs,
Path path,
boolean expectedExists,
long deadline) throws IOException {

try (FsDataOutputStream ignored = fs.open(path)) {
// object exists
return true;
}
catch (FileNotFoundException e) {
// object does not exist
return false;
}
}
```
instead of the current code.
```java
public static void checkPathExistsEventually(
FileSystem fs,
Path path,
boolean expectedExists,
long deadline) throws IOException, InterruptedException {
boolean dirExists;
while ((dirExists = fs.exists(path)) != expectedExists &&
System.nanoTime() < deadline) {
Thread.sleep(10);
}
assertEquals(expectedExists, dirExists);
}
```

What do you think? Should we use the "strongly consistent" variant for the 
Presto-S3-based connector?


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-26 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5624
  
That's actually the check for whether or not to overwrite the file - let me 
drop the whole code of this example to give some more context:
```
  public FSDataOutputStream create(Path f, FsPermission permission,
  boolean overwrite, int bufferSize, short replication, long blockSize,
  Progressable progress) throws IOException {
String key = pathToKey(f);
S3AFileStatus status = null;
try {
  // get the status or throw an FNFE
  status = getFileStatus(f);

  // if the thread reaches here, there is something at the path
  if (status.isDirectory()) {
// path references a directory: automatic error
throw new FileAlreadyExistsException(f + " is a directory");
  }
  if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(f + " already exists");
  }
  LOG.debug("Overwriting file {}", f);
} catch (FileNotFoundException e) {
  // this means the file is not found

}
instrumentation.fileCreated();
FSDataOutputStream output;
if (blockUploadEnabled) {
  output = new FSDataOutputStream(
  new S3ABlockOutputStream(this,
  key,
  new SemaphoredDelegatingExecutor(boundedThreadPool,
  blockOutputActiveBlocks, true),
  progress,
  partSize,
  blockFactory,
  instrumentation.newOutputStreamStatistics(statistics),
  new WriteOperationHelper(key)
  ),
  null);
} else {

  // We pass null to FSDataOutputStream so it won't count writes that
  // are being buffered to a file
  output = new FSDataOutputStream(
  new S3AOutputStream(getConf(),
  this,
  key,
  progress
  ),
  null);
}
return output;
  }
```

At this point, however, there is the read-before-write which is causing the 
problem.
(of course some other concurrent job could create the file in the meantime 
and this check is not guarding too much but we cannot do too much about it)


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5624
  
@NicoK Is that actually a source of problems here? Or only a check to for 
graceful "file already exists" message, but nothing that would get in the way 
when creating a new file?


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-22 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5624
  
@StephanEwen unfortunately not, for example:
`org.apache.flink.runtime.fs.hdfs.HadoopFileSystem#create()` -> 
`org.apache.hadoop.fs.FileSystem#create()` -> 
`org.apache.hadoop.fs.s3a.S3AFileSystem#create()` and this (depending on the 
Hadoop version, of course) may call this:
```
  // get the status or throw an FNFE
  status = getFileStatus(f);

  // if the thread reaches here, there is something at the path
  if (status.isDirectory()) {
...
```


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5624
  
Don't we control the calls on `FileSystem` that we make as part of the YARN 
upload? Can't we simply avoid all operations that are not strongly consistent? 
For example if we strictly only use `create()` and `open()` we should be 
consistent.


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-16 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5624
  
Unfortunately, this is not entirely under our control, since we rely on the 
underlying `FileSystem` implementation. I can probably reduce some parts of the 
tests that may lead to eventual consistent operations, but looking through 
[S3's data consistency 
model](https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel),
 I figure we still (at least) have to deal with these scenarios:

> - A process writes a new object to Amazon S3 and immediately lists keys 
within its bucket. Until the change is fully propagated, the object might not 
appear in the list. 
> - A process deletes an existing object and immediately lists keys within 
its bucket. Until the deletion is fully propagated, Amazon S3 might list the 
deleted object.

This could, however, be handled on a case-to-case basis and I could try to 
improve the tests in this regard.


---


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5624
  
The code and the tests should not rely on eventually consistent operations 
at all, but only on strongly consistent operations, like "read after create".

That should also eliminate any need for retries.

Minor comment: `System.currentTimeMillis()` should not be used for interval 
measurements. It is not a stable clock. All intervals should be measured in 
`System.nanoTime()`.


---