[GitHub] flink pull request #5811: [FLINK-9113] [connectors] Fix flushing behavior of...

2018-04-06 Thread twalthr
Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/5811


---


[GitHub] flink pull request #5811: [FLINK-9113] [connectors] Fix flushing behavior of...

2018-04-06 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5811#discussion_r179743870
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -366,6 +384,11 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
throw new RuntimeException("Error while creating 
FileSystem when initializing the state of the BucketingSink.", e);
}
 
+   // sync on flush for local file systems
+   if (localSyncOnFlush && (fs instanceof LocalFileSystem) && 
(writerTemplate instanceof StreamWriterBase)) {
--- End diff --

Shouldn't `(writerTemplate instanceof StreamWriterBase)` check be converted 
here into `checState(writerTemplate instanceof StreamWriterBase)` inside the if 
branch, and the same check be extracted and validated whenever user calls:
`setWriter(...)` or `setLocalSyncOnFlush(...)`? So that non 
`StreamWriterBase` and `localSyncOnFlush = true` would be invalid 
configuration? Otherwise users might experience `wtf` moments when flag is 
being ignored after changing their writer.


---


[GitHub] flink pull request #5811: [FLINK-9113] [connectors] Fix flushing behavior of...

2018-04-04 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5811

[FLINK-9113] [connectors] Fix flushing behavior of bucketing sink for local 
filesystems

## What is the purpose of the change

This PR changes the flushing behavior for HDFS' local filesystem 
abstraction. See also FLINK-9113 for more details.


## Brief change log

- Use `hsync` for local filesystems
- Add method to disable the new behavior
- Additional check for verifying correct valid length files


## Verifying this change

This fix is difficult to verify as it requires a OS process that is killed 
before syncing. I added a dedicated local filesystem test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9113

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5811.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5811


commit 543f206f0e9e8415468f5d1092553754a8869fc7
Author: Timo Walther 
Date:   2018-04-04T08:29:57Z

[FLINK-9113] [connectors] Fix flushing behavior of bucketing sink for local 
filesystems




---