[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/339 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-72390974 +1, will merge. Thanks @zentol & @tillrohrmann! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71828186 Looks good to me now. Thanks Chesney. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71827151 Haste makes waste ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71825712 well you sure know how to keep me busy :) you are right about moving it back. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71819301 With your latest changes Chesney, namely the incrementing/decrementing logic, I think that it now makes sense again to increase the counters in the createTmpFile method. Because otherwise the following could happen. 1. You create a new temp file but do not execute the copy process yet. The respective counter is 1 2. You delete the same file and the delete process is immediately executed. This would delete the file. 3. Now the copy process is started and has to transfer the file again even though it was known at the time of deleting the file that it is still needed. What do you think? Am I missing a point which prevents putting the incrementing logic back into the createTmpFile method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23680555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -179,27 +179,40 @@ public Path call() { private String name; private JobID jobID; - private int oldCount; + private String filePath; public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { this.name = name; this.jobID = jobID; - this.oldCount = c; + this.filePath = e.filePath; } + @Override public void run() { - synchronized (count) { - if (count.get(new ImmutablePair(jobID, name)) != oldCount) { + Pair key = new ImmutablePair(jobID, name); + synchronized (lock) { + if (!count.containsKey(key)) { return; } - } - Path tmp = getTempDir(jobID, ""); - try { - if (lfs.exists(tmp)) { - lfs.delete(tmp, true); + count.put(key, count.get(key) - 1); + if (count.get(key) != 0) { + return; + } + Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1)); + try { + if (lfs.exists(tmp)) { + lfs.delete(tmp, true); + } + count.remove(key); + if (count.isEmpty()) { //delete job directory --- End diff -- This won't probably work, because there might still be other jobs running on the TaskManager which have not yet deleted all of their files. The problem is the ```Pair```. Either we change count to be something like ```HashMap[JobID, HashMap[String, Integer]]``` or have to iterate over all entries to see if there still exists an entry with the respective jobID. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23680449 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -179,27 +179,40 @@ public Path call() { private String name; private JobID jobID; - private int oldCount; + private String filePath; public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { this.name = name; this.jobID = jobID; - this.oldCount = c; + this.filePath = e.filePath; } + @Override public void run() { - synchronized (count) { - if (count.get(new ImmutablePair(jobID, name)) != oldCount) { + Pair key = new ImmutablePair(jobID, name); + synchronized (lock) { + if (!count.containsKey(key)) { return; } - } - Path tmp = getTempDir(jobID, ""); - try { - if (lfs.exists(tmp)) { - lfs.delete(tmp, true); + count.put(key, count.get(key) - 1); + if (count.get(key) != 0) { + return; --- End diff -- Same here with the if condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23680424 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -179,27 +179,40 @@ public Path call() { private String name; private JobID jobID; - private int oldCount; + private String filePath; public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { this.name = name; this.jobID = jobID; - this.oldCount = c; + this.filePath = e.filePath; } + @Override public void run() { - synchronized (count) { - if (count.get(new ImmutablePair(jobID, name)) != oldCount) { + Pair key = new ImmutablePair(jobID, name); + synchronized (lock) { + if (!count.containsKey(key)) { return; --- End diff -- Could we invert the if condition? Imho something like ```if(count.containsKey(key)){ }``` gives a cleaner control flow without too many return statements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23680315 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -179,27 +179,40 @@ public Path call() { private String name; private JobID jobID; - private int oldCount; + private String filePath; public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) { --- End diff -- We no longer need c. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71815201 updated to include discussed changes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71754470 You're right Chesney. I assume that the faulty DC wasn't noticed because it was probably never really used ;-) Your solution should make the DC to work properly. We could even get rid of the second counter by simply decrementing the counter upon deletion. If the counter is 0, then the file can be deleted. Nice illustrations btw. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71672497 Whenever I look more closely at the DC I'm always left wondering how it can work at all. About your first point, i don't think thats enough. there is a more fundamental flaw, we need another counter for delete processes. consider the following 2 scenarios with 2 tasks distributing the same file. C denotes the creating of a copying process, D denotes deleting process. # denotes the count variable, O the oldCount variable. ``` 1): I II III IV T1:---CD T2:---CD--- # 1 22 2 O 2 2 2)I II III IV T1:---CD--- T2:---CD # 1 22 2 O 2 2 ``` In both scenarios, D at III should not delete the file, but all D's have the very same information. instead, i propose having 2 counters, one counting the # of copy operations; and one counting the # of delete operations, with the current value (at process creation) stored in the process. when executing, if the current value is equal to the copy count, files may be deleted, since this means that this delete process was the last to be started. let's make another fancy schema to illustrate the point: ``` 1): I II III IV T1:---CD T2:---CD--- # 1 22 2 O 1 2 2)I II III IV T1:---CD--- T2:---CD # 1 22 2 O 1 2 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71654749 both good points. I'll address them after lunch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71652771 I'm wondering whether the count hash map update should rather happen in the copy process. Because otherwise there could be the following interleaving: 1. You register a new temp file "foobar" for task B --> creating a copy task and increment file counter 2. You delete the temp file "foobar" for task A because it is finished --> creating a delete process with the incremented counter 3. You execute the copy process 4. You execute the delete process Then the file "foobar" does not exist for task B. Another thing is that the DeleteProcess tries to delete the whole directory below the jobID if one file shall be deleted. I don't know whether this is the right behaviour. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71638812 updated to include discussed changes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71494058 Yeah, that would probably solve the problem. With race conditions it is often very tricky. Sometimes little changes change the process interleaving such that the problem seems to be fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71490264 Yes, but only the access to the count hash map. The delete action itself is not synchronized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71490079 oh i see what you mean, maybe extend the synchronized block to include the actual delete stuff. yup that's a good idea, all i know is i tried it without the change and ran into issues, with the change it ran. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71489135 but that is exactly what is changing, both the delete and copy process are synchronized on the same object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71488487 I don't really understand how the static lock solves the mentioned issue. Is there a concurrency problem between creating files on disk and updating the count hash map? I think there is a problem between the DeleteProcess and the CopyProcess. The CopyProcess is synchronized on the static lock object and the DeleteProcess is not. Thus, it might be the case that the copy method created the directories for a new file "foobar", let's say /tmp/123/foobar, and afterwards the delete process deletes the directory /tmp/123 because it checked the count hash map before the createTmpFile method was called. This problem should still persist with the current changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/339#discussion_r23541551 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -72,7 +72,7 @@ * @return copy task */ public FutureTask createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) { - synchronized (count) { + synchronized (lock) { --- End diff -- How does the static lock solves the problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/339 [FLINK-1419] [runtime] DC properly synchronized Addresses the issue of files not being preserved in subsequent operations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/incubator-flink dc_cache_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/339.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #339 commit 5c9059d3ce58d8415ce374927dd253579a5fd741 Author: zentol Date: 2015-01-26T10:07:53Z [FLINK-1419] [runtime] DC properly synchronized --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---