[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14300784#comment-14300784 ] ASF GitHub Bot commented on FLINK-1419: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-72390974 +1, will merge. Thanks @zentol @tillrohrmann! DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14300788#comment-14300788 ] ASF GitHub Bot commented on FLINK-1419: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/339 DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14295017#comment-14295017 ] ASF GitHub Bot commented on FLINK-1419: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23680424 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -179,27 +179,40 @@ public Path call() { private String name; private JobID jobID; - private int oldCount; + private String filePath; public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { this.name = name; this.jobID = jobID; - this.oldCount = c; + this.filePath = e.filePath; } + @Override public void run() { - synchronized (count) { - if (count.get(new ImmutablePairJobID, String(jobID, name)) != oldCount) { + PairJobID, String key = new ImmutablePairJobID, String(jobID, name); + synchronized (lock) { + if (!count.containsKey(key)) { return; --- End diff -- Could we invert the if condition? Imho something like ```if(count.containsKey(key)){ }``` gives a cleaner control flow without too many return statements. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14295015#comment-14295015 ] ASF GitHub Bot commented on FLINK-1419: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23680315 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -179,27 +179,40 @@ public Path call() { private String name; private JobID jobID; - private int oldCount; + private String filePath; public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { --- End diff -- We no longer need c. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14294991#comment-14294991 ] ASF GitHub Bot commented on FLINK-1419: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71815201 updated to include discussed changes DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14295092#comment-14295092 ] ASF GitHub Bot commented on FLINK-1419: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71827151 Haste makes waste ;-) DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14293416#comment-14293416 ] ASF GitHub Bot commented on FLINK-1419: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71638812 updated to include discussed changes DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292024#comment-14292024 ] ASF GitHub Bot commented on FLINK-1419: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71488487 I don't really understand how the static lock solves the mentioned issue. Is there a concurrency problem between creating files on disk and updating the count hash map? I think there is a problem between the DeleteProcess and the CopyProcess. The CopyProcess is synchronized on the static lock object and the DeleteProcess is not. Thus, it might be the case that the copy method created the directories for a new file foobar, let's say /tmp/123/foobar, and afterwards the delete process deletes the directory /tmp/123 because it checked the count hash map before the createTmpFile method was called. This problem should still persist with the current changes. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292011#comment-14292011 ] ASF GitHub Bot commented on FLINK-1419: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23541551 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -72,7 +72,7 @@ * @return copy task */ public FutureTaskPath createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) { - synchronized (count) { + synchronized (lock) { --- End diff -- How does the static lock solves the problem? DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292036#comment-14292036 ] ASF GitHub Bot commented on FLINK-1419: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71490079 oh i see what you mean, maybe extend the synchronized block to include the actual delete stuff. yup that's a good idea, all i know is i tried it without the change and ran into issues, with the change it ran. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292038#comment-14292038 ] ASF GitHub Bot commented on FLINK-1419: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71490264 Yes, but only the access to the count hash map. The delete action itself is not synchronized. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292032#comment-14292032 ] ASF GitHub Bot commented on FLINK-1419: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71489135 but that is exactly what is changing, both the delete and copy process are synchronized on the same object. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282393#comment-14282393 ] Fabian Hueske commented on FLINK-1419: -- I think it is fine to make files in the DC immutable (read only). An operator that wants to modify files, can create a local writable copy. Files in the DC should only be copied once per TM and stay until the job is finished, IMO. The question is, who initiates the copy process. The first task that requires the file? In that case, all other tasks need to recognize that the file is copied by another task and wait until the copying is completed. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)