[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-02-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/339


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-02-01 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-72390974
  
+1, will merge. Thanks @zentol & @tillrohrmann!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71828186
  
Looks good to me now. Thanks Chesney.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71827151
  
Haste makes waste ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71825712
  
well you sure know how to keep me busy :)

you are right about moving it back. Updated.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71819301
  
With your latest changes Chesney, namely the incrementing/decrementing 
logic, I think that it now makes sense again to increase the counters in the 
createTmpFile method. Because otherwise the following could happen.

1. You create a new temp file but do not execute the copy process yet. The 
respective counter is 1
2. You delete the same file and the delete process is immediately executed. 
This would delete the file.
3. Now the copy process is started and has to transfer the file again even 
though it was known at the time of deleting the file that it is still needed.

What do you think? Am I missing a point which prevents putting the 
incrementing logic back into the createTmpFile method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/339#discussion_r23680555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -179,27 +179,40 @@ public Path call()  {

private String name;
private JobID jobID;
-   private int oldCount;
+   private String filePath;
 
public DeleteProcess(String name, DistributedCacheEntry e, 
JobID jobID, int c) {
this.name = name;
this.jobID = jobID;
-   this.oldCount = c;
+   this.filePath = e.filePath;
}
+
@Override
public void run() {
-   synchronized (count) {
-   if (count.get(new ImmutablePair(jobID, name)) != oldCount) {
+   Pair key = new ImmutablePair(jobID, name);
+   synchronized (lock) {
+   if (!count.containsKey(key)) {
return;
}
-   }
-   Path tmp = getTempDir(jobID, "");
-   try {
-   if (lfs.exists(tmp)) {
-   lfs.delete(tmp, true);
+   count.put(key, count.get(key) - 1);
+   if (count.get(key) != 0) {
+   return;
+   }
+   Path tmp = getTempDir(jobID, 
filePath.substring(filePath.lastIndexOf("/") + 1));
+   try {
+   if (lfs.exists(tmp)) {
+   lfs.delete(tmp, true);
+   }
+   count.remove(key);
+   if (count.isEmpty()) { //delete job 
directory
--- End diff --

This won't probably work, because there might still be other jobs running 
on the TaskManager which have not yet deleted all of their files. The problem 
is the ```Pair```. Either we change count to be something like 
```HashMap[JobID, HashMap[String, Integer]]``` or have to iterate over all 
entries to see if there still exists an entry with the respective jobID.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/339#discussion_r23680449
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -179,27 +179,40 @@ public Path call()  {

private String name;
private JobID jobID;
-   private int oldCount;
+   private String filePath;
 
public DeleteProcess(String name, DistributedCacheEntry e, 
JobID jobID, int c) {
this.name = name;
this.jobID = jobID;
-   this.oldCount = c;
+   this.filePath = e.filePath;
}
+
@Override
public void run() {
-   synchronized (count) {
-   if (count.get(new ImmutablePair(jobID, name)) != oldCount) {
+   Pair key = new ImmutablePair(jobID, name);
+   synchronized (lock) {
+   if (!count.containsKey(key)) {
return;
}
-   }
-   Path tmp = getTempDir(jobID, "");
-   try {
-   if (lfs.exists(tmp)) {
-   lfs.delete(tmp, true);
+   count.put(key, count.get(key) - 1);
+   if (count.get(key) != 0) {
+   return;
--- End diff --

Same here with the if condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/339#discussion_r23680424
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -179,27 +179,40 @@ public Path call()  {

private String name;
private JobID jobID;
-   private int oldCount;
+   private String filePath;
 
public DeleteProcess(String name, DistributedCacheEntry e, 
JobID jobID, int c) {
this.name = name;
this.jobID = jobID;
-   this.oldCount = c;
+   this.filePath = e.filePath;
}
+
@Override
public void run() {
-   synchronized (count) {
-   if (count.get(new ImmutablePair(jobID, name)) != oldCount) {
+   Pair key = new ImmutablePair(jobID, name);
+   synchronized (lock) {
+   if (!count.containsKey(key)) {
return;
--- End diff --

Could we invert the if condition? Imho something like 
```if(count.containsKey(key)){  }``` gives a cleaner control flow without 
too many return statements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/339#discussion_r23680315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -179,27 +179,40 @@ public Path call()  {

private String name;
private JobID jobID;
-   private int oldCount;
+   private String filePath;
 
public DeleteProcess(String name, DistributedCacheEntry e, 
JobID jobID, int c) {
--- End diff --

We no longer need c.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71815201
  
updated to include discussed changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-27 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71754470
  
You're right Chesney. I assume that the faulty DC wasn't noticed because it 
was probably never really used ;-) 

Your solution should make the DC to work properly. We could even get rid of 
the second counter by simply decrementing the counter upon deletion. If the 
counter is 0, then the file can be deleted. Nice illustrations btw.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-27 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71672497
  
Whenever I look more closely at the DC I'm always left wondering how it can 
work at all.

About your first point, i don't think thats enough. there is a more 
fundamental flaw, we need another counter for delete processes.

consider the following 2 scenarios with 2 tasks distributing the same file.
C denotes the creating of a copying process, D denotes deleting process. # 
denotes the count variable, O the oldCount variable.

```
1):   I   II  III  IV
T1:---CD
T2:---CD---
# 1   22   2
O  2   2

2)I   II  III  IV
T1:---CD---
T2:---CD
# 1   22   2
O  2   2
```

In both scenarios, D at III should not delete the file, but all D's have 
the very same information.

instead, i propose having 2 counters, one counting the # of copy 
operations; and one counting the # of delete operations, with the current value 
(at process creation) stored in the process. when executing, if the current 
value is equal to the copy count, files may be deleted, since this means that 
this delete process was the last to be started.

let's make another fancy schema to illustrate the point:
```
1):   I   II  III  IV
T1:---CD
T2:---CD---
# 1   22   2
O  1   2

2)I   II  III  IV
T1:---CD---
T2:---CD
# 1   22   2
O  1   2
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-27 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71654749
  
both good points. I'll address them after lunch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-27 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71652771
  
I'm wondering whether the count hash map update should rather happen in the 
copy process. Because otherwise there could be the following interleaving:

1. You register a new temp file "foobar" for task B --> creating a copy 
task and increment file counter
2. You delete the temp file "foobar" for task A because it is finished --> 
creating a delete process with the incremented counter
3. You execute the copy process
4. You execute the delete process

Then the file "foobar" does not exist for task B.

Another thing is that the DeleteProcess tries to delete the whole directory 
below the jobID if one file shall be deleted. I don't know whether this is the 
right behaviour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-27 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71638812
  
updated to include discussed changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71494058
  
Yeah, that would probably solve the problem. 

With race conditions it is often very tricky. Sometimes little changes 
change the process interleaving such that the problem seems to be fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71490264
  
Yes, but only the access to the count hash map. The delete action itself is 
not synchronized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71490079
  
oh i see what you mean, maybe extend the synchronized block to include the 
actual delete stuff. yup that's a good idea, all i know is i tried it without 
the change and ran into issues, with the change it ran.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71489135
  
but that is exactly what is changing, both the delete and copy process are 
synchronized on the same object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71488487
  
I don't really understand how the static lock solves the mentioned issue. 
Is there a concurrency problem between creating files on disk and updating the 
count hash map?

I think there is a problem between the DeleteProcess and the CopyProcess. 
The CopyProcess is synchronized on the static lock object and the DeleteProcess 
is not. Thus, it might be the case that the copy method created the directories 
for a new file "foobar", let's say /tmp/123/foobar, and afterwards the delete 
process deletes the directory /tmp/123 because it checked the count hash map 
before the createTmpFile method was called.

This problem should still persist with the current changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/339#discussion_r23541551
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -72,7 +72,7 @@
 * @return copy task
 */
public FutureTask createTmpFile(String name, 
DistributedCacheEntry entry, JobID jobID) {
-   synchronized (count) {
+   synchronized (lock) {
--- End diff --

How does the static lock solves the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/339

[FLINK-1419] [runtime] DC properly synchronized

Addresses the issue of files not being preserved in subsequent operations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/incubator-flink dc_cache_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/339.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #339


commit 5c9059d3ce58d8415ce374927dd253579a5fd741
Author: zentol 
Date:   2015-01-26T10:07:53Z

[FLINK-1419] [runtime] DC properly synchronized




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---