[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-23 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448259#comment-16448259
 ] 

Timo Walther commented on FLINK-9113:
-

Fixed in 1.4.3: 896797be71e920110aa270402d031969126221bb

> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-23 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448035#comment-16448035
 ] 

Kostas Kloudas commented on FLINK-9113:
---

I am fine with backporting the fix to 1.4, as this is a bug fix.

> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-23 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448029#comment-16448029
 ] 

Timo Walther commented on FLINK-9113:
-

I will also apply this fix to the 1.4 branch. For this I would need FLINK-8600 
as well. It's not a big change. Are we fine this? [~aljoscha] [~kkl0u]

> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443997#comment-16443997
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5861


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442614#comment-16442614
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5861#discussion_r182452074
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -1245,6 +1246,12 @@ else if (scheme != null && authority == null) {
}
 
fs.initialize(fsUri, finalConf);
+
+   // By default we don't perform checksums on Hadoop's 
local filesystem and use the raw filesystem.
--- End diff --

The "by default" is not necessary anymore. We now always use the raw 
filesystem. This is a leftover from the previous version that allowed changing 
this.


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441179#comment-16441179
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5861
  
It seems that for Hadoop 2.8.3 truncating is supported for the raw local 
filesystems. I will need to adapt the test for that.


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440840#comment-16440840
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5861

[FLINK-9113] [connectors] Use raw local file system for bucketing sink to 
prevent data loss

## What is the purpose of the change

This change replaces Hadoop's LocalFileSystem (which is a checksumming 
filesystem) with the RawFileSystem implementation. For performing checksums the 
default filesystem only flushes in 512 byte intervals which might lead to data 
loss during checkpointing. In order to guarantee exact results we skip the 
checksum computation and perform a raw flush.

Negative effect: Existing checksums are not maintained anymore and thus 
become invalid.

## Brief change log

- Replace local filesystem by raw filesystem


## Verifying this change

Added a check for verifying the file length and file size.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9113

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5861.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5861


commit 17b85bd5fd65e6ec31374df0ca0af7451881d90a
Author: Timo Walther 
Date:   2018-04-17T13:12:55Z

[FLINK-9113] [connectors] Use raw local file system for bucketing sink to 
prevent data loss




> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428451#comment-16428451
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5811
  
I will close this PR for now. Because it could not solve the general issue 
mentioned in HADOOP-7844.


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428452#comment-16428452
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/5811


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> For local filesystems, it is not guaranteed that the data is flushed to disk 
> during checkpointing. This leads to data loss in cases of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. The {{flush()}} method returns a 
> written length but the data is not written into the file (thus the valid 
> length might be greater than the actual file size). {{hsync}} and {{hflush}} 
> have no effect either.
> It seems that this behavior won't be fixed in the near future: 
> https://issues.apache.org/jira/browse/HADOOP-7844
> One solution would be to call {{close()}} on a checkpoint for local 
> filesystems, even though this would lead to performance decrease. If we don't 
> fix this issue, we should at least add proper documentation for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428297#comment-16428297
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5811#discussion_r179743870
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -366,6 +384,11 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
throw new RuntimeException("Error while creating 
FileSystem when initializing the state of the BucketingSink.", e);
}
 
+   // sync on flush for local file systems
+   if (localSyncOnFlush && (fs instanceof LocalFileSystem) && 
(writerTemplate instanceof StreamWriterBase)) {
--- End diff --

Shouldn't `(writerTemplate instanceof StreamWriterBase)` check be converted 
here into `checState(writerTemplate instanceof StreamWriterBase)` inside the if 
branch, and the same check be extracted and validated whenever user calls:
`setWriter(...)` or `setLocalSyncOnFlush(...)`? So that non 
`StreamWriterBase` and `localSyncOnFlush = true` would be invalid 
configuration? Otherwise users might experience `wtf` moments when flag is 
being ignored after changing their writer.


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> This issue is closely related to FLINK-7737. By default the bucketing sink 
> uses HDFS's {{org.apache.hadoop.fs.FSDataOutputStream#hflush}} for 
> performance reasons. However, this leads to data loss in case of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. We should use {{hsync}} by default 
> in local filesystem cases and make it possible to disable this behavior if 
> needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428287#comment-16428287
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5811
  
Thanks for looking into it @kl0u. I observed the same behavior during 
debugging. I will remove the check for now and open a follow up issue. If there 
is no better solution, we might need to close the writer for checkpoints on 
local filesystems for preventing data loss in cases where the OS/machine goes 
down.


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> This issue is closely related to FLINK-7737. By default the bucketing sink 
> uses HDFS's {{org.apache.hadoop.fs.FSDataOutputStream#hflush}} for 
> performance reasons. However, this leads to data loss in case of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. We should use {{hsync}} by default 
> in local filesystem cases and make it possible to disable this behavior if 
> needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428266#comment-16428266
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5811
  
Well it seems like for these tests, the `flush` is not actually flushing. 
The files are there, the `validPartLength` is correct (=6 as we just write 
`test1\n`) but the data is not actually on disk. If you call `close()` on the 
in-progress file when snapshotting, then the tests succeed and the data is 
there.

I would recommend to just remove the check for now, and open a followup 
JIRA that contains the check that you will remove, and also points on the 
discussion about HDFS not flushing, and we see how to proceed.

I thing that the fact that the end-to-end tests pass point to the direction 
that sth is wrong with the FS abstraction.


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> This issue is closely related to FLINK-7737. By default the bucketing sink 
> uses HDFS's {{org.apache.hadoop.fs.FSDataOutputStream#hflush}} for 
> performance reasons. However, this leads to data loss in case of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. We should use {{hsync}} by default 
> in local filesystem cases and make it possible to disable this behavior if 
> needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425519#comment-16425519
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5811
  
@kl0u do you have an idea why `BucketingSinkTest` is failing? It seems that 
the written data parts are always empty.


> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> This issue is closely related to FLINK-7737. By default the bucketing sink 
> uses HDFS's {{org.apache.hadoop.fs.FSDataOutputStream#hflush}} for 
> performance reasons. However, this leads to data loss in case of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. We should use {{hsync}} by default 
> in local filesystem cases and make it possible to disable this behavior if 
> needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9113) Data loss in BucketingSink when writing to local filesystem

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425196#comment-16425196
 ] 

ASF GitHub Bot commented on FLINK-9113:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5811

[FLINK-9113] [connectors] Fix flushing behavior of bucketing sink for local 
filesystems

## What is the purpose of the change

This PR changes the flushing behavior for HDFS' local filesystem 
abstraction. See also FLINK-9113 for more details.


## Brief change log

- Use `hsync` for local filesystems
- Add method to disable the new behavior
- Additional check for verifying correct valid length files


## Verifying this change

This fix is difficult to verify as it requires a OS process that is killed 
before syncing. I added a dedicated local filesystem test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9113

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5811.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5811


commit 543f206f0e9e8415468f5d1092553754a8869fc7
Author: Timo Walther 
Date:   2018-04-04T08:29:57Z

[FLINK-9113] [connectors] Fix flushing behavior of bucketing sink for local 
filesystems




> Data loss in BucketingSink when writing to local filesystem
> ---
>
> Key: FLINK-9113
> URL: https://issues.apache.org/jira/browse/FLINK-9113
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> This issue is closely related to FLINK-7737. By default the bucketing sink 
> uses HDFS's {{org.apache.hadoop.fs.FSDataOutputStream#hflush}} for 
> performance reasons. However, this leads to data loss in case of TaskManager 
> failures when writing to a local filesystem 
> {{org.apache.hadoop.fs.LocalFileSystem}}. We should use {{hsync}} by default 
> in local filesystem cases and make it possible to disable this behavior if 
> needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)