[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131496338
  
I'd like to get this merged soon. This removes multiple constructors for 
Runtime contexts and establishes a clean hierarchy, making any changes to the 
constructors easier. This will be useful for two Jiras on exposing task 
configuration and task attempt number to the Runtime context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37143936
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

This is to provide access to Task attempt number from Runtime Context. I 
should add a description of the other tickets this resolves.
Is this a good idea though? To fix five issues in one PR? Or should I open 
a separate one and keep this one for just distributed cache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131504643
  
I decided to go ahead and implement things which touch the Runtime Context 
constructors with this PR. This now closes five Jiras, namely 2449, 2458, 2488, 
2496 and 2524. Commit messages are descriptive of each Jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37143896
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

Why is this changed from before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131519550
  
Reverting back to make this PR only about the distributed cache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144152
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

generally we try to keep one PR for one issue, exceptions should only be 
done for closely related issues.

why did you decide to add these issues into this PR? ( i have a hard time 
understanding it, since the commits barely touch the same files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144275
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

I would prefer if you opened a second PR once this is merged. The issues 
are not really related to each other; the 2nd commit was simply made based on 
the 1st commit. We would end up having two separate discussions in 1 PR, which 
i think is a bad idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144194
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

Yes. The addition of distributed cache removes the need for multiple 
constructors for `RuntimeContext`s. Since providing access to runtime 
information needed changing the constructors, I deemed it better to work with 
what would be the only needed constructors after merging this. 
I can revert this commit and open a separate PR for the *other* three 
issues if necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131579342
  
Looks good, merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570065
  
We are indeed falling behind on merging pull requests, right now. Many 
committers are on vacation this month, and for the others, the large amount of 
pull requests is hard to keep up with, especially next to the work on our own 
issues.

Hope this will get better in a week or two.

I'll try to get a look at this very soon...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570789
  
Aside from the comment above, this looks good. Would merge this, after the 
comment is addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/970


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131570435
  
In the `CollectionExecutor`, can you skip creating the `ExecutiorService`? 
You can eagerly resolve the path and then put an already finished future into 
the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-131575300
  
Addressed comments. @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r36640260
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 ---
@@ -501,4 +536,22 @@ public int getSuperstepNumber() {
return (T) previousAggregates.get(name);
}
}
+
+   private static final class DoingNothing implements CallablePath{
--- End diff --

It actually does something ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-10 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-129481422
  
Looks good, in general.

Can you add the test to one of the other iteration test files? This saves 
cluster startup and shutdown costs, making builds faster. Maybe to the 
iteration aggregators, or iteration accumulators.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-10 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r36680981
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 ---
@@ -501,4 +536,22 @@ public int getSuperstepNumber() {
return (T) previousAggregates.get(name);
}
}
+
+   private static final class DoingNothing implements CallablePath{
--- End diff --

Haha. Yes. In an earlier version of the code, it wasn't. :')


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-10 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/970#issuecomment-129648092
  
I've moved the test to an existing MultipleProgramTestBase`. Should be good 
to merge now. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r36512933
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 ---
@@ -501,4 +536,22 @@ public int getSuperstepNumber() {
return (T) previousAggregates.get(name);
}
}
+
+   private static final class DoingNothing implements CallablePath{
+   private Path entry;
+
+   public DoingNothing(Path entry){
+   this.entry = entry;
+   }
+
+   @Override
+   public Path call() throws IOException{
+   try{
+   LocalFileSystem fs = (LocalFileSystem) 
entry.getFileSystem();
+   return entry.isAbsolute() ? new 
Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
+   } catch (ClassCastException e){
+   throw new RuntimeException(Collection 
execution must have only local file paths);
--- End diff --

dislike this error message, there's is no apparent relation to the 
distributed cache. 
how about The DistrbutedCache only supports local files for Collection 
Environments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-07 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r36513272
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 ---
@@ -501,4 +536,22 @@ public int getSuperstepNumber() {
return (T) previousAggregates.get(name);
}
}
+
+   private static final class DoingNothing implements CallablePath{
+   private Path entry;
+
+   public DoingNothing(Path entry){
+   this.entry = entry;
+   }
+
+   @Override
+   public Path call() throws IOException{
+   try{
+   LocalFileSystem fs = (LocalFileSystem) 
entry.getFileSystem();
+   return entry.isAbsolute() ? new 
Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
+   } catch (ClassCastException e){
+   throw new RuntimeException(Collection 
execution must have only local file paths);
--- End diff --

Yeah. That would make more sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r36512186
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
 ---
@@ -37,18 +37,17 @@
private final HashMapString, Object initializedBroadcastVars = new 
HashMapString, Object();

private final HashMapString, List? uninitializedBroadcastVars = new 
HashMapString, List?();
-   
-   
+
public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig 
executionConfig, MapString, Accumulator?,? accumulators) {
-   super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators);
+   this(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig,
+   new HashMapString, FuturePath(), 
accumulators);
}
-   
+
public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig 
executionConfig, MapString, FuturePath cpTasks, MapString, 
Accumulator?,? accumulators) {
super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators, cpTasks);
}
-   
--- End diff --

Here we have a few unnecessary formatting changes that just clutter the 
diff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-01 Thread sachingoel0101
GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/970

[FLINK-2458][FLINK-2449]Access distributed cache entries for 
CollectionExecution and in Iterative tasks.

1. This PR adds support for accessing distributed cache entries when 
running iterations.
2. Since there are several tests which execute on both Cluster and 
Collection modes, it seems logical to not fail a test on either if it passes on 
both. Distributed Cache files create one such case. There is nothing actually 
wrong with trying to access a distributed cache entry when running in 
collection environment. It just doesn't really make sense to do so.
This takes care of that too.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink iteration_cache_files

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #970


commit c5b33898d1727cd196e7836becc2a30266641eec
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-08-01T13:55:42Z

[FLINK-2458][FLINK-2449]Access distributed cache entries for
CollectionExecution and in Iterative tasks.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---