[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205848#comment-15205848
 ] 

ASF GitHub Bot commented on FLINK-1745:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-199648335
  
Hi @danielblazevski, thanks for update! Looks good to me for 
implementation. (Some minor issues and rebasing will be addressed by me.)

About docs, I meant we need to add description, examples and meaning of 
parameters to documentation in our homepage (`docs/libs/ml/knn.md`).



> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-03-21 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-199648335
  
Hi @danielblazevski, thanks for update! Looks good to me for 
implementation. (Some minor issues and rebasing will be addressed by me.)

About docs, I meant we need to add description, examples and meaning of 
parameters to documentation in our homepage (`docs/libs/ml/knn.md`).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3579) Improve String concatenation

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205813#comment-15205813
 ] 

ASF GitHub Bot commented on FLINK-3579:
---

Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1821#issuecomment-199642088
  
@fhueske - Not sure how to check the failed test cases. Any chance of a 
review here? Thank you.


> Improve String concatenation
> 
>
> Key: FLINK-3579
> URL: https://issues.apache.org/jira/browse/FLINK-3579
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>
> Concatenation of a String and non-String does not work properly.
> e.g. {{f0 + 42}} leads to RelBuilder Exception
> ExpressionParser does not like {{f0 + 42.cast(STRING)}} either.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)

2016-03-21 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1821#issuecomment-199642088
  
@fhueske - Not sure how to check the failed test cases. Any chance of a 
review here? Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 - Adding just a log message

2016-03-21 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1822#issuecomment-199642001
  
Done the update @fhueske . Pls review. Thanks. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205812#comment-15205812
 ] 

ASF GitHub Bot commented on FLINK-3179:
---

Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1822#issuecomment-199642001
  
Done the update @fhueske . Pls review. Thanks. 


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> -
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.1
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet> words = ...
> DataSet> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3631] CodeGenerator does not check type...

2016-03-21 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1823#issuecomment-199641069
  
I have done the update @fhueske .Thanks for the review. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3631) CodeGenerator does not check type compatibility for equality expressions

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205811#comment-15205811
 ] 

ASF GitHub Bot commented on FLINK-3631:
---

Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1823#issuecomment-199641069
  
I have done the update @fhueske .Thanks for the review. 


> CodeGenerator does not check type compatibility for equality expressions
> 
>
> Key: FLINK-3631
> URL: https://issues.apache.org/jira/browse/FLINK-3631
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The following Table API query does not fail but produces an empty result:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
> // must fail. Field 'a is not a string.
> ds.filter( 'a === "nope" ).collect()
> {code}
> The generated flatMap code looks like this:
> {code}
> @Override
> public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws 
> Exception {
>   org.apache.flink.api.table.Row in1 = (org.apache.flink.api.table.Row) _in1;
>   
>   java.lang.String result$17 = (java.lang.String) in1.productElement(2);
>   int result$11 = (java.lang.Integer) in1.productElement(0);
>   long result$14 = (java.lang.Long) in1.productElement(1);
>   java.lang.String result$19 = "nope";
>   
>   boolean result$21 = result$19.equals(result$11);
>   
>   if (result$21) {
> out.setField(0, result$11);
>   out.setField(1, result$14);
>   out.setField(2, result$17);
> c.collect(out);
>   }
> }
> {code}
> I would expect the query to fail due to a Integer/String type conflict.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [hotfix] Update python.md

2016-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1814


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Update python.md

2016-03-21 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1814#issuecomment-199640748
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3644) WebRuntimMonitor set java.io.tmpdir does not work for change upload dir.

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205767#comment-15205767
 ] 

ASF GitHub Bot commented on FLINK-3644:
---

GitHub user Astralidea opened a pull request:

https://github.com/apache/flink/pull/1824

[FLINK-3644] WebRuntimMonitor set java.io.tmpdir does not work

To fix this issue, I think we should add new config to config flink-web 
temp dir.
I don't know why java.io.tmpdir does not work. it value always /tmp
the space of  /tmp in my cluster is very less. user upload often.
So I have to let this config go into effect.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/QunarBlackOps/flink FLINK-3644-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1824.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1824


commit 3e581199beff543c3d35e73ab9669fd3907502ad
Author: xueyan.li 
Date:   2016-03-22T04:20:06Z

add new config for set web monitor tmp dir




> WebRuntimMonitor set java.io.tmpdir does not work for change upload dir.
> 
>
> Key: FLINK-3644
> URL: https://issues.apache.org/jira/browse/FLINK-3644
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.0.0
> Environment: flink-conf.yaml -> java.io.tmpdir: .
> java -server -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled 
> -XX:+CMSClassUnloadingEnabled -XX:+UseParNewGC -XX:+UseCompressedOops 
> -XX:+UseFastEmptyMethods -XX:+UseFastAccessorMethods -XX:+AlwaysPreTouch 
> -Xmx1707m -Dlog4j.configuration=file:log4j-mesos.properties 
> -Djava.io.tmpdir=. -cp 
> flink-dist_2.10-1.0.0.jar:log4j-1.2.17.jar:slf4j-log4j12-1.7.7.jar:flink-python_2.10-1.0.0.jar
> java version "1.8.0_60"
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
> Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
> CentOS release 6.4 (Final)
>Reporter: astralidea
>
> flink-conf.yaml & -Djava.io.tmpdir=. does not work for me.
> I don't know why.I look for the code System.getProperty("java.io.tmpdir") 
> should work.but it is not worked.
> but in web ui in job manager configuration could see the java.io.tmpdir is 
> set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3644] WebRuntimMonitor set java.io.tmpd...

2016-03-21 Thread Astralidea
GitHub user Astralidea opened a pull request:

https://github.com/apache/flink/pull/1824

[FLINK-3644] WebRuntimMonitor set java.io.tmpdir does not work

To fix this issue, I think we should add new config to config flink-web 
temp dir.
I don't know why java.io.tmpdir does not work. it value always /tmp
the space of  /tmp in my cluster is very less. user upload often.
So I have to let this config go into effect.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/QunarBlackOps/flink FLINK-3644-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1824.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1824


commit 3e581199beff543c3d35e73ab9669fd3907502ad
Author: xueyan.li 
Date:   2016-03-22T04:20:06Z

add new config for set web monitor tmp dir




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3644) WebRuntimMonitor set java.io.tmpdir does not work for change upload dir.

2016-03-21 Thread astralidea (JIRA)
astralidea created FLINK-3644:
-

 Summary: WebRuntimMonitor set java.io.tmpdir does not work for 
change upload dir.
 Key: FLINK-3644
 URL: https://issues.apache.org/jira/browse/FLINK-3644
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.0.0
 Environment: flink-conf.yaml -> java.io.tmpdir: .
java -server -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled 
-XX:+CMSClassUnloadingEnabled -XX:+UseParNewGC -XX:+UseCompressedOops 
-XX:+UseFastEmptyMethods -XX:+UseFastAccessorMethods -XX:+AlwaysPreTouch 
-Xmx1707m -Dlog4j.configuration=file:log4j-mesos.properties -Djava.io.tmpdir=. 
-cp 
flink-dist_2.10-1.0.0.jar:log4j-1.2.17.jar:slf4j-log4j12-1.7.7.jar:flink-python_2.10-1.0.0.jar
java version "1.8.0_60"
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
CentOS release 6.4 (Final)
Reporter: astralidea


flink-conf.yaml & -Djava.io.tmpdir=. does not work for me.
I don't know why.I look for the code System.getProperty("java.io.tmpdir") 
should work.but it is not worked.
but in web ui in job manager configuration could see the java.io.tmpdir is set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()

2016-03-21 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3222:
--
Description: 
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.


The shift amount is greater than 31 bits.

  was:
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.

The shift amount is greater than 31 bits.


> Incorrect shift amount in OperatorCheckpointStats#hashCode()
> 
>
> Key: FLINK-3222
> URL: https://issues.apache.org/jira/browse/FLINK-3222
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
> >>> 32));
> {code}
> subTaskStats.length is an int.
> The shift amount is greater than 31 bits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3576) Upgrade Snappy Java to 1.1.2.1

2016-03-21 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3576:
--
Description: 
There was a JVM memory leaky problem reported in 
https://github.com/xerial/snappy-java/issues/131

The above issue has been resolved.
1.1.2.1 was released on Jan 22nd.

We should upgrade to this release.

  was:
There was a JVM memory leaky problem reported in 
https://github.com/xerial/snappy-java/issues/131

The above issue has been resolved.
1.1.2.1 was released on Jan 22nd.


We should upgrade to this release.


> Upgrade Snappy Java to 1.1.2.1
> --
>
> Key: FLINK-3576
> URL: https://issues.apache.org/jira/browse/FLINK-3576
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> There was a JVM memory leaky problem reported in 
> https://github.com/xerial/snappy-java/issues/131
> The above issue has been resolved.
> 1.1.2.1 was released on Jan 22nd.
> We should upgrade to this release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2997) Support range partition with user customized data distribution.

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205375#comment-15205375
 ] 

ASF GitHub Bot commented on FLINK-2997:
---

Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/1776#discussion_r56916757
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -45,35 +46,48 @@
private final PartitionMethod pMethod;
private final String partitionLocationName;
private final Partitioner customPartitioner;
-   
-   
+   private final DataDistribution distribution;
+
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, String partitionLocationName) {
-   this(input, pMethod, pKeys, null, null, partitionLocationName);
+   this(input, pMethod, pKeys, null, null, null, 
partitionLocationName);
}
-   
+
+   public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, DataDistribution distribution, String partitionLocationName) {
+   this(input, pMethod, pKeys, null, null, distribution, 
partitionLocationName);
+   }
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
String partitionLocationName) {
-   this(input, pMethod, null, null, null, partitionLocationName);
+   this(input, pMethod, null, null, null, null, 
partitionLocationName);
}

public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, String partitionLocationName) {
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, null, partitionLocationName);
}

-   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, 
+   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner,
TypeInformation partitionerTypeInfo, String 
partitionLocationName)
{
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, null, partitionLocationName);
}

-   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner, 
-   TypeInformation partitionerTypeInfo, String 
partitionLocationName)
+   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner,
+   TypeInformation partitionerTypeInfo, 
DataDistribution distribution, String partitionLocationName)
{
super(input, input.getType());

Preconditions.checkNotNull(pMethod);
Preconditions.checkArgument(pKeys != null || pMethod == 
PartitionMethod.REBALANCE, "Partitioning requires keys");
Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM 
|| customPartitioner != null, "Custom partioning requires a partitioner.");
-
+   Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");
+   
+   if (distribution != null) {
+   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
+   for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
+   
Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]),
 "The types of key from the distribution and range partitioner are not equal.");
--- End diff --

code modified and rebase the new commit with previous one.( you must have 
stayed up late last night :) )


> Support range partition with user customized data distribution.
> ---
>
> Key: FLINK-2997
> URL: https://issues.apache.org/jira/browse/FLINK-2997
> Project: Flink
>  Issue Type: New Feature
>Reporter: Chengxiang Li
>
> This is a followup work of FLINK-7, sometime user have better knowledge of 
> the source data, and they can build customized data distribution to do range 
> partition more efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

2016-03-21 Thread gallenvara
Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/1776#discussion_r56916757
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -45,35 +46,48 @@
private final PartitionMethod pMethod;
private final String partitionLocationName;
private final Partitioner customPartitioner;
-   
-   
+   private final DataDistribution distribution;
+
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, String partitionLocationName) {
-   this(input, pMethod, pKeys, null, null, partitionLocationName);
+   this(input, pMethod, pKeys, null, null, null, 
partitionLocationName);
}
-   
+
+   public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, DataDistribution distribution, String partitionLocationName) {
+   this(input, pMethod, pKeys, null, null, distribution, 
partitionLocationName);
+   }
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
String partitionLocationName) {
-   this(input, pMethod, null, null, null, partitionLocationName);
+   this(input, pMethod, null, null, null, null, 
partitionLocationName);
}

public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, String partitionLocationName) {
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, null, partitionLocationName);
}

-   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, 
+   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner,
TypeInformation partitionerTypeInfo, String 
partitionLocationName)
{
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, null, partitionLocationName);
}

-   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner, 
-   TypeInformation partitionerTypeInfo, String 
partitionLocationName)
+   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner,
+   TypeInformation partitionerTypeInfo, 
DataDistribution distribution, String partitionLocationName)
{
super(input, input.getType());

Preconditions.checkNotNull(pMethod);
Preconditions.checkArgument(pKeys != null || pMethod == 
PartitionMethod.REBALANCE, "Partitioning requires keys");
Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM 
|| customPartitioner != null, "Custom partioning requires a partitioner.");
-
+   Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");
+   
+   if (distribution != null) {
+   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
+   for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
+   
Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]),
 "The types of key from the distribution and range partitioner are not equal.");
--- End diff --

code modified and rebase the new commit with previous one.( you must have 
stayed up late last night :) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2997) Support range partition with user customized data distribution.

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205273#comment-15205273
 ] 

ASF GitHub Bot commented on FLINK-2997:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1776#discussion_r56907132
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -45,35 +46,48 @@
private final PartitionMethod pMethod;
private final String partitionLocationName;
private final Partitioner customPartitioner;
-   
-   
+   private final DataDistribution distribution;
+
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, String partitionLocationName) {
-   this(input, pMethod, pKeys, null, null, partitionLocationName);
+   this(input, pMethod, pKeys, null, null, null, 
partitionLocationName);
}
-   
+
+   public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, DataDistribution distribution, String partitionLocationName) {
+   this(input, pMethod, pKeys, null, null, distribution, 
partitionLocationName);
+   }
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
String partitionLocationName) {
-   this(input, pMethod, null, null, null, partitionLocationName);
+   this(input, pMethod, null, null, null, null, 
partitionLocationName);
}

public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, String partitionLocationName) {
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, null, partitionLocationName);
}

-   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, 
+   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner,
TypeInformation partitionerTypeInfo, String 
partitionLocationName)
{
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, null, partitionLocationName);
}

-   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner, 
-   TypeInformation partitionerTypeInfo, String 
partitionLocationName)
+   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner,
+   TypeInformation partitionerTypeInfo, 
DataDistribution distribution, String partitionLocationName)
{
super(input, input.getType());

Preconditions.checkNotNull(pMethod);
Preconditions.checkArgument(pKeys != null || pMethod == 
PartitionMethod.REBALANCE, "Partitioning requires keys");
Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM 
|| customPartitioner != null, "Custom partioning requires a partitioner.");
-
+   Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");
+   
+   if (distribution != null) {
+   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
+   for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
+   
Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]),
 "The types of key from the distribution and range partitioner are not equal.");
--- End diff --

can you fetch the key type arrays only once and not for every key?


> Support range partition with user customized data distribution.
> ---
>
> Key: FLINK-2997
> URL: https://issues.apache.org/jira/browse/FLINK-2997
> Project: Flink
>  Issue Type: New Feature
>Reporter: Chengxiang Li
>
> This is a followup work of FLINK-7, sometime user have better knowledge of 
> the source data, and they can build customized data distribution to do range 
> partition more efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

2016-03-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1776#discussion_r56907132
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -45,35 +46,48 @@
private final PartitionMethod pMethod;
private final String partitionLocationName;
private final Partitioner customPartitioner;
-   
-   
+   private final DataDistribution distribution;
+
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, String partitionLocationName) {
-   this(input, pMethod, pKeys, null, null, partitionLocationName);
+   this(input, pMethod, pKeys, null, null, null, 
partitionLocationName);
}
-   
+
+   public PartitionOperator(DataSet input, PartitionMethod pMethod, 
Keys pKeys, DataDistribution distribution, String partitionLocationName) {
+   this(input, pMethod, pKeys, null, null, distribution, 
partitionLocationName);
+   }
+
public PartitionOperator(DataSet input, PartitionMethod pMethod, 
String partitionLocationName) {
-   this(input, pMethod, null, null, null, partitionLocationName);
+   this(input, pMethod, null, null, null, null, 
partitionLocationName);
}

public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, String partitionLocationName) {
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, null, partitionLocationName);
}

-   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner, 
+   public  PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customPartitioner,
TypeInformation partitionerTypeInfo, String 
partitionLocationName)
{
-   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, partitionLocationName);
+   this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, null, partitionLocationName);
}

-   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner, 
-   TypeInformation partitionerTypeInfo, String 
partitionLocationName)
+   private  PartitionOperator(DataSet input, PartitionMethod 
pMethod, Keys pKeys, Partitioner customPartitioner,
+   TypeInformation partitionerTypeInfo, 
DataDistribution distribution, String partitionLocationName)
{
super(input, input.getType());

Preconditions.checkNotNull(pMethod);
Preconditions.checkArgument(pKeys != null || pMethod == 
PartitionMethod.REBALANCE, "Partitioning requires keys");
Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM 
|| customPartitioner != null, "Custom partioning requires a partitioner.");
-
+   Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");
+   
+   if (distribution != null) {
+   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
+   for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
+   
Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]),
 "The types of key from the distribution and range partitioner are not equal.");
--- End diff --

can you fetch the key type arrays only once and not for every key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3631] CodeGenerator does not check type...

2016-03-21 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1823#issuecomment-199506835
  
Thanks for the PR @ramkrish86 
Looks good. Just one minor thing. Can you add a one line comment to each 
test that explains why the program should fail such as the tests 
`testNonWorkingSubstring1()` and `testNonWorkingSubstring2()`? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3631) CodeGenerator does not check type compatibility for equality expressions

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205250#comment-15205250
 ] 

ASF GitHub Bot commented on FLINK-3631:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1823#issuecomment-199506835
  
Thanks for the PR @ramkrish86 
Looks good. Just one minor thing. Can you add a one line comment to each 
test that explains why the program should fail such as the tests 
`testNonWorkingSubstring1()` and `testNonWorkingSubstring2()`? Thanks!


> CodeGenerator does not check type compatibility for equality expressions
> 
>
> Key: FLINK-3631
> URL: https://issues.apache.org/jira/browse/FLINK-3631
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The following Table API query does not fail but produces an empty result:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
> // must fail. Field 'a is not a string.
> ds.filter( 'a === "nope" ).collect()
> {code}
> The generated flatMap code looks like this:
> {code}
> @Override
> public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws 
> Exception {
>   org.apache.flink.api.table.Row in1 = (org.apache.flink.api.table.Row) _in1;
>   
>   java.lang.String result$17 = (java.lang.String) in1.productElement(2);
>   int result$11 = (java.lang.Integer) in1.productElement(0);
>   long result$14 = (java.lang.Long) in1.productElement(1);
>   java.lang.String result$19 = "nope";
>   
>   boolean result$21 = result$19.equals(result$11);
>   
>   if (result$21) {
> out.setField(0, result$11);
>   out.setField(1, result$14);
>   out.setField(2, result$17);
> c.collect(out);
>   }
> }
> {code}
> I would expect the query to fail due to a Integer/String type conflict.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3179 - Adding just a log message

2016-03-21 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1822#issuecomment-199503791
  
Thanks for the PR! Just had a suggestion for the wording of the log 
message. Looks good otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205237#comment-15205237
 ] 

ASF GitHub Bot commented on FLINK-3179:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1822#issuecomment-199503791
  
Thanks for the PR! Just had a suggestion for the wording of the log 
message. Looks good otherwise.


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> -
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.1
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet> words = ...
> DataSet> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205236#comment-15205236
 ] 

ASF GitHub Bot commented on FLINK-3179:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1822#discussion_r56904841
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 ---
@@ -61,6 +65,9 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
(node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
{
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   LOG.warn("Consider adding an explicit 
CombinerFunction with groupCombine in front of the partition operator");
--- End diff --

Can you extend the error message as follows?

> Cannot automatically inject combiner for ReduceFunction. Please add an 
explicit combiner with combineGroup() in front of the partition operator. 


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> -
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.1
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet> words = ...
> DataSet> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3179 - Adding just a log message

2016-03-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1822#discussion_r56904780
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -90,6 +94,9 @@ public DriverStrategy getStrategy() {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   LOG.warn("Consider adding an explicit 
CombinerFunction with groupCombine in front of the partition operator");
--- End diff --

Can you extend the error message as follows?

> Cannot automatically inject combiner for GroupReduceFunction. Please add 
an explicit combiner with combineGroup() in front of the partition operator. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3179 - Adding just a log message

2016-03-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1822#discussion_r56904841
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 ---
@@ -61,6 +65,9 @@ public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
(node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
{
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   LOG.warn("Consider adding an explicit 
CombinerFunction with groupCombine in front of the partition operator");
--- End diff --

Can you extend the error message as follows?

> Cannot automatically inject combiner for ReduceFunction. Please add an 
explicit combiner with combineGroup() in front of the partition operator. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3179) Combiner is not injected if Reduce or GroupReduce input is explicitly partitioned

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205235#comment-15205235
 ] 

ASF GitHub Bot commented on FLINK-3179:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1822#discussion_r56904780
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 ---
@@ -90,6 +94,9 @@ public DriverStrategy getStrategy() {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
+   if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+   LOG.warn("Consider adding an explicit 
CombinerFunction with groupCombine in front of the partition operator");
--- End diff --

Can you extend the error message as follows?

> Cannot automatically inject combiner for GroupReduceFunction. Please add 
an explicit combiner with combineGroup() in front of the partition operator. 



> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> -
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.1
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet> words = ...
> DataSet> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1579) Create a Flink History Server

2016-03-21 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205180#comment-15205180
 ] 

Eron Wright  commented on FLINK-1579:
-

Something to consider before taking a dependency on the YARN timeline server, 
Flink on Mesos will suffer a similar situation.


> Create a Flink History Server
> -
>
> Key: FLINK-1579
> URL: https://issues.apache.org/jira/browse/FLINK-1579
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 0.9
>Reporter: Robert Metzger
>
> Right now its not possible to analyze the job results for jobs that ran on 
> YARN, because we'll loose the information once the JobManager has stopped.
> Therefore, I propose to implement a "Flink History Server" which serves  the 
> results from these jobs.
> I haven't started thinking about the implementation, but I suspect it 
> involves some JSON files stored in HDFS :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3587) Bump Calcite version to 1.7.0

2016-03-21 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-3587:


Assignee: Fabian Hueske

> Bump Calcite version to 1.7.0
> -
>
> Key: FLINK-3587
> URL: https://issues.apache.org/jira/browse/FLINK-3587
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Vasia Kalavri
>Assignee: Fabian Hueske
>
> We currently depend on Calcite 1.5.0. The latest stable release is 1.6.0, but 
> I propose we bump the version to 1.7.0-SNAPSHOT to benefit from latest 
> features. If we do that, we can also get rid of the custom 
> {{FlinkJoinUnionTransposeRule}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3587) Bump Calcite version to 1.7.0

2016-03-21 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri updated FLINK-3587:
-
Summary: Bump Calcite version to 1.7.0  (was: Bump Calcite version to 
1.7.0-SNAPSHOT)

> Bump Calcite version to 1.7.0
> -
>
> Key: FLINK-3587
> URL: https://issues.apache.org/jira/browse/FLINK-3587
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Vasia Kalavri
>
> We currently depend on Calcite 1.5.0. The latest stable release is 1.6.0, but 
> I propose we bump the version to 1.7.0-SNAPSHOT to benefit from latest 
> features. If we do that, we can also get rid of the custom 
> {{FlinkJoinUnionTransposeRule}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3587) Bump Calcite version to 1.7.0-SNAPSHOT

2016-03-21 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205032#comment-15205032
 ] 

Vasia Kalavri commented on FLINK-3587:
--

Yes, I saw it too!

> Bump Calcite version to 1.7.0-SNAPSHOT
> --
>
> Key: FLINK-3587
> URL: https://issues.apache.org/jira/browse/FLINK-3587
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Vasia Kalavri
>
> We currently depend on Calcite 1.5.0. The latest stable release is 1.6.0, but 
> I propose we bump the version to 1.7.0-SNAPSHOT to benefit from latest 
> features. If we do that, we can also get rid of the custom 
> {{FlinkJoinUnionTransposeRule}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3587) Bump Calcite version to 1.7.0-SNAPSHOT

2016-03-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205029#comment-15205029
 ] 

Fabian Hueske commented on FLINK-3587:
--

The vote to release Apache Calcite 1.7 just passed. We can update the 
dependency once the Maven artifacts are published.

> Bump Calcite version to 1.7.0-SNAPSHOT
> --
>
> Key: FLINK-3587
> URL: https://issues.apache.org/jira/browse/FLINK-3587
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Vasia Kalavri
>
> We currently depend on Calcite 1.5.0. The latest stable release is 1.6.0, but 
> I propose we bump the version to 1.7.0-SNAPSHOT to benefit from latest 
> features. If we do that, we can also get rid of the custom 
> {{FlinkJoinUnionTransposeRule}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3115: Update ElasticSearch connector to ...

2016-03-21 Thread smarthi
Github user smarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1792#discussion_r56886134
  
--- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml 
---
@@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-elasticsearch2_2.10
+   flink-connector-elasticsearch2
+
+   jar
+
+   
+   
+   2.2.1
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+
+org.elasticsearch
+elasticsearch
+${elasticsearch.version}
+
+
+   
+   com.fasterxml.jackson.core
+   jackson-core
+   2.7.2
--- End diff --

ElasticSearch 2.2.1 needs at least Jackson 2.6.2 or greater. Flink parent's 
presently at 2.4.2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3115) Update Elasticsearch connector to 2.X

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204984#comment-15204984
 ] 

ASF GitHub Bot commented on FLINK-3115:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1792#discussion_r56886134
  
--- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml 
---
@@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-elasticsearch2_2.10
+   flink-connector-elasticsearch2
+
+   jar
+
+   
+   
+   2.2.1
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+
+org.elasticsearch
+elasticsearch
+${elasticsearch.version}
+
+
+   
+   com.fasterxml.jackson.core
+   jackson-core
+   2.7.2
--- End diff --

ElasticSearch 2.2.1 needs at least Jackson 2.6.2 or greater. Flink parent's 
presently at 2.4.2


> Update Elasticsearch connector to 2.X
> -
>
> Key: FLINK-3115
> URL: https://issues.apache.org/jira/browse/FLINK-3115
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 0.10.0, 1.0.0, 0.10.1
>Reporter: Maximilian Michels
>Assignee: Suneel Marthi
> Fix For: 1.0.1
>
>
> The Elasticsearch connector is not up to date anymore. In version 2.X the API 
> changed. The code needs to be adapted. Probably it makes sense to have a new 
> class {{ElasticsearchSink2}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204869#comment-15204869
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56875016
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56875016
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! decorateMessage(
-AcknowledgeRegistration(
-  instanceID,
-  libraryCacheManager.getBlobServerPort)
-  )
+  taskManager ! decorateMessage(
+

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204823#comment-15204823
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56872406
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

Reverted the comment.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56872406
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

Reverted the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56870497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * A standalone implementation of the resource manager. Used when the 
system is started in
+ * standalone mode (via scripts), rather than via a resource framework 
like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends 
FlinkResourceManager {
+   
+
+   public StandaloneResourceManager(Configuration flinkConfig, 
LeaderRetrievalService leaderRetriever) {
+   super(0, flinkConfig, leaderRetriever);
+   }
+
+   // 

+   //  Framework specific behavior
+   // 

+
+
+   @Override
+   protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
+   super.newJobManagerLeaderAvailable(leaderAddress, 
leaderSessionID);
+   }
+
+   @Override
+   protected void initialize() throws Exception {
+   // nothing to initialize
+   }
+
+   @Override
+   protected void leaderUpdated() {
+   // nothing to update
+   }
+
+   @Override
+   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+   }
+
+   @Override
+   protected void fatalError(String message, Throwable error) {
+   log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+   LOG.error("Shutting down process");
--- End diff --

Yep, will change that. There is the Actor logger and the RM logger. No need 
to have both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204799#comment-15204799
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56870497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * A standalone implementation of the resource manager. Used when the 
system is started in
+ * standalone mode (via scripts), rather than via a resource framework 
like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends 
FlinkResourceManager {
+   
+
+   public StandaloneResourceManager(Configuration flinkConfig, 
LeaderRetrievalService leaderRetriever) {
+   super(0, flinkConfig, leaderRetriever);
+   }
+
+   // 

+   //  Framework specific behavior
+   // 

+
+
+   @Override
+   protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
+   super.newJobManagerLeaderAvailable(leaderAddress, 
leaderSessionID);
+   }
+
+   @Override
+   protected void initialize() throws Exception {
+   // nothing to initialize
+   }
+
+   @Override
+   protected void leaderUpdated() {
+   // nothing to update
+   }
+
+   @Override
+   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+   }
+
+   @Override
+   protected void fatalError(String message, Throwable error) {
+   log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+   LOG.error("Shutting down process");
--- End diff --

Yep, will change that. There is the Actor logger and the RM logger. No need 
to have both.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204787#comment-15204787
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56869299
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -29,22 +29,22 @@ object Messages {
   case object Acknowledge
 
   /**
-   * Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
-   *
-   * The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
-   * loss more quickly.
-   *
-   * The JobManager may send this message to its TaskManagers to let them 
clean up their
-   * tasks that depend on the JobManager and go into a clean state.
-   *
-   * @param reason The reason for disconnecting, to be displayed in log 
and error messages.
-   */
-  case class Disconnect(reason: String) extends RequiresLeaderSessionID
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The Acknowledge case object instance.
+*/
+  def getAcknowledge(): Acknowledge.type = Acknowledge
 
   /**
-   * Accessor for the case object instance, to simplify Java 
interoperability.
-   *
-   * @return The Acknowledge case object instance.
-   */
-  def getAcknowledge(): Acknowledge.type = Acknowledge
+* Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
+*
+* The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
+* loss more quickly.
+*
+* The JobManager may send this message to its TaskManagers to let them 
clean up their
+* tasks that depend on the JobManager and go into a clean state.
+*
+* @param reason The reason for disconnecting, to be displayed in log 
and error messages.
+*/
+  case class Disconnect(reason: String) extends RequiresLeaderSessionID
--- End diff --

You're right, there is no need to change the order here. I'll revert the 
changes.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3207) Add a Pregel iteration abstraction to Gelly

2016-03-21 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri resolved FLINK-3207.
--
   Resolution: Implemented
Fix Version/s: 1.1.0

> Add a Pregel iteration abstraction to Gelly
> ---
>
> Key: FLINK-3207
> URL: https://issues.apache.org/jira/browse/FLINK-3207
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
> Fix For: 1.1.0
>
>
> This issue proposes to add a Pregel/Giraph-like iteration abstraction to 
> Gelly that will only expose one UDF to the user, {{compute()}}. {{compute()}} 
> will have access to both the vertex state and the incoming messages, and will 
> be able to produce messages and update the vertex value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3207) Add a Pregel iteration abstraction to Gelly

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204785#comment-15204785
 ] 

ASF GitHub Bot commented on FLINK-3207:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1575


> Add a Pregel iteration abstraction to Gelly
> ---
>
> Key: FLINK-3207
> URL: https://issues.apache.org/jira/browse/FLINK-3207
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> This issue proposes to add a Pregel/Giraph-like iteration abstraction to 
> Gelly that will only expose one UDF to the user, {{compute()}}. {{compute()}} 
> will have access to both the vertex state and the incoming messages, and will 
> be able to produce messages and update the vertex value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56869299
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -29,22 +29,22 @@ object Messages {
   case object Acknowledge
 
   /**
-   * Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
-   *
-   * The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
-   * loss more quickly.
-   *
-   * The JobManager may send this message to its TaskManagers to let them 
clean up their
-   * tasks that depend on the JobManager and go into a clean state.
-   *
-   * @param reason The reason for disconnecting, to be displayed in log 
and error messages.
-   */
-  case class Disconnect(reason: String) extends RequiresLeaderSessionID
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The Acknowledge case object instance.
+*/
+  def getAcknowledge(): Acknowledge.type = Acknowledge
 
   /**
-   * Accessor for the case object instance, to simplify Java 
interoperability.
-   *
-   * @return The Acknowledge case object instance.
-   */
-  def getAcknowledge(): Acknowledge.type = Acknowledge
+* Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
+*
+* The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
+* loss more quickly.
+*
+* The JobManager may send this message to its TaskManagers to let them 
clean up their
+* tasks that depend on the JobManager and go into a clean state.
+*
+* @param reason The reason for disconnecting, to be displayed in log 
and error messages.
+*/
+  case class Disconnect(reason: String) extends RequiresLeaderSessionID
--- End diff --

You're right, there is no need to change the order here. I'll revert the 
changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3207] [gelly] adds the vertex-centric i...

2016-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1575


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204782#comment-15204782
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56869070
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

You're right. There should be an upper bound for the number of retries. If 
there was never a RM, then is behaves the same.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56869070
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

You're right. There should be an upper bound for the number of retries. If 
there was never a RM, then is behaves the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204711#comment-15204711
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56863532
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

For me it is the other way around. Maybe change this to 
>Akka URL to the JobManager to ask for the JobManager actor

?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56863532
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

For me it is the other way around. Maybe change this to 
>Akka URL to the JobManager to ask for the JobManager actor

?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204706#comment-15204706
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56863204
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56863204
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! decorateMessage(
-AcknowledgeRegistration(
-  instanceID,
-  libraryCacheManager.getBlobServerPort)
-  )
+  taskManager ! decorateMessage(
+

[jira] [Assigned] (FLINK-3640) Add support for SQL queries in DataSet programs

2016-03-21 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri reassigned FLINK-3640:


Assignee: Vasia Kalavri

> Add support for SQL queries in DataSet programs
> ---
>
> Key: FLINK-3640
> URL: https://issues.apache.org/jira/browse/FLINK-3640
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> This issue covers the task of supporting SQL queries embedded in DataSet 
> programs. In this mode, the input and output of a SQL query is a Table. For 
> this issue, we need to make the following additions to the Table API:
> - add a {{tEnv.sql(query: String): Table}} method for converting a query 
> result into a Table
> - integrate Calcite's SQL parser into the batch Table API translation process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204672#comment-15204672
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message is teh successful response to a heartbeat. 
+ */
+public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, 
java.io.Serializable {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204671#comment-15204671
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860326
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java
 ---
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution;
+package org.apache.flink.runtime.clusterframework.standalone;
 
-public interface ExecutionObserver {
+/**
+ * The startup mode for the standalone setup,
+ */
+public enum StandaloneMode {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message is teh successful response to a heartbeat. 
+ */
+public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, 
java.io.Serializable {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860326
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java
 ---
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution;
+package org.apache.flink.runtime.clusterframework.standalone;
 
-public interface ExecutionObserver {
+/**
+ * The startup mode for the standalone setup,
+ */
+public enum StandaloneMode {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204663#comment-15204663
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860123
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
--- End diff --

You're right that the message can be lost. Apart from the leader election 
service there is currently no mechanism to detect a lost connection from RM to 
the JM.

The above code is part of a special case where the RM doesn't reply to a TM 
registration and the JM decides to disconnect the RM. I suppose we should keep 
retrying to send the TriggerRegistrationAtJobManager until we receive a 
registration of a RM.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56860123
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
--- End diff --

You're right that the message can be lost. Apart from the leader election 
service there is currently no mechanism to detect a lost connection from RM to 
the JM.

The above code is part of a special case where the RM doesn't reply to a TM 
registration and the JM decides to disconnect the RM. I suppose we should keep 
retrying to send the TriggerRegistrationAtJobManager until we receive a 
registration of a RM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204629#comment-15204629
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56859466
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import java.io.Serializable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Heartbeat message send by a standalone TaskManager to the resource 
master.
+ */
+public class HeartbeatMessage implements RequiresLeaderSessionID, 
Serializable {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56859173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+
+/**
+ * Class for Resource Ids assigned at the FlinkResourceManager.
+ */
+public class ResourceID implements Serializable {
--- End diff --

In standalone mode, this is just a random String but in Yarn mode, this is 
the container id assigned at the framework. Thus, not random.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204625#comment-15204625
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56859173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+
+/**
+ * Class for Resource Ids assigned at the FlinkResourceManager.
+ */
+public class ResourceID implements Serializable {
--- End diff --

In standalone mode, this is just a random String but in Yarn mode, this is 
the container id assigned at the framework. Thus, not random.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56859466
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import java.io.Serializable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Heartbeat message send by a standalone TaskManager to the resource 
master.
+ */
+public class HeartbeatMessage implements RequiresLeaderSessionID, 
Serializable {
--- End diff --

You're looking at an old commit. I already removed this class but didn't 
want to force-push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856717
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -29,22 +29,22 @@ object Messages {
   case object Acknowledge
 
   /**
-   * Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
-   *
-   * The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
-   * loss more quickly.
-   *
-   * The JobManager may send this message to its TaskManagers to let them 
clean up their
-   * tasks that depend on the JobManager and go into a clean state.
-   *
-   * @param reason The reason for disconnecting, to be displayed in log 
and error messages.
-   */
-  case class Disconnect(reason: String) extends RequiresLeaderSessionID
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The Acknowledge case object instance.
+*/
+  def getAcknowledge(): Acknowledge.type = Acknowledge
 
   /**
-   * Accessor for the case object instance, to simplify Java 
interoperability.
-   *
-   * @return The Acknowledge case object instance.
-   */
-  def getAcknowledge(): Acknowledge.type = Acknowledge
+* Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
+*
+* The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
+* loss more quickly.
+*
+* The JobManager may send this message to its TaskManagers to let them 
clean up their
+* tasks that depend on the JobManager and go into a clean state.
+*
+* @param reason The reason for disconnecting, to be displayed in log 
and error messages.
+*/
+  case class Disconnect(reason: String) extends RequiresLeaderSessionID
--- End diff --

I don't see the reason for these changes here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204586#comment-15204586
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856463
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

What if there was never a RM started? Would that make a difference?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204596#comment-15204596
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56857119
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -87,7 +91,7 @@ object RegistrationMessages {
 extends RegistrationMessage
 
   /**
-   * Denotes the unsuccessful registration of a task manager at the job 
manager. This is the
+   * Denotes the unsuccessful registration of a task manager at the 
JobManager. This is the
--- End diff --

This change and the change in line 74 don't seem to be consistent.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204594#comment-15204594
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856958
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

I don't understand the comment here. Admittedly, before it was not much 
better.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56857119
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -87,7 +91,7 @@ object RegistrationMessages {
 extends RegistrationMessage
 
   /**
-   * Denotes the unsuccessful registration of a task manager at the job 
manager. This is the
+   * Denotes the unsuccessful registration of a task manager at the 
JobManager. This is the
--- End diff --

This change and the change in line 74 don't seem to be consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856958
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
 ---
@@ -36,7 +38,7 @@ object RegistrationMessages {
   /**
* Triggers the TaskManager to attempt a registration at the JobManager.
*
-   * @param jobManagerURL Akka URL to the JobManager
+   * @param jobManagerURL Akka URL to the JobManager to ask for the 
JobManager
--- End diff --

I don't understand the comment here. Admittedly, before it was not much 
better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204591#comment-15204591
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856717
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -29,22 +29,22 @@ object Messages {
   case object Acknowledge
 
   /**
-   * Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
-   *
-   * The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
-   * loss more quickly.
-   *
-   * The JobManager may send this message to its TaskManagers to let them 
clean up their
-   * tasks that depend on the JobManager and go into a clean state.
-   *
-   * @param reason The reason for disconnecting, to be displayed in log 
and error messages.
-   */
-  case class Disconnect(reason: String) extends RequiresLeaderSessionID
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The Acknowledge case object instance.
+*/
+  def getAcknowledge(): Acknowledge.type = Acknowledge
 
   /**
-   * Accessor for the case object instance, to simplify Java 
interoperability.
-   *
-   * @return The Acknowledge case object instance.
-   */
-  def getAcknowledge(): Acknowledge.type = Acknowledge
+* Signals that the receiver (JobManager/TaskManager) shall disconnect 
the sender.
+*
+* The TaskManager may send this on shutdown to let the JobManager 
realize the TaskManager
+* loss more quickly.
+*
+* The JobManager may send this message to its TaskManagers to let them 
clean up their
+* tasks that depend on the JobManager and go into a clean state.
+*
+* @param reason The reason for disconnecting, to be displayed in log 
and error messages.
+*/
+  case class Disconnect(reason: String) extends RequiresLeaderSessionID
--- End diff --

I don't see the reason for these changes here.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56856463
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

What if there was never a RM started? Would that make a difference?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204584#comment-15204584
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855904
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

What if we will never establish a connection to the RM again for some 
reason? Wouldn't that mean that we will never shutdown the JM?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855904
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
+
+  // trigger shutdown
+  shutdown()
+
+case None =>
+  // retry
+  context.system.scheduler.scheduleOnce(
+2 seconds, self, decorateMessage(msg)
+  )(context.dispatcher)
--- End diff --

What if we will never establish a connection to the RM again for some 
reason? Wouldn't that mean that we will never shutdown the JM?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204579#comment-15204579
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855460
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204578#comment-15204578
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855273
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
--- End diff --

What happens if this message never reaches the ResourceManager? Is there a 
mean that the RM can detect that it lost connection to the JM?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855460
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
+  // slow or unreachable resource manager, register anyway and 
let the rm reconnect
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  self ! decorateMessage(new DisconnectResourceManager(rm))
+  }(context.dispatcher)
+
+case None =>
+  log.info("Task Manager Registration but not connected to 
ResourceManager")
+  // ResourceManager not yet available
+  // sending task manager information later upon ResourceManager 
registration
+  self ! decorateMessage(new 
RegisterResourceSuccessful(taskManager, msg))
+  }
+
+case msg: RegisterResourceSuccessful =>
+
+  val originalMsg = msg.getRegistrationMessage
+  val taskManager = msg.getTaskManager
+
+  // ResourceManager knows about the resource, now let's try to 
register TaskManager
   if (instanceManager.isRegistered(taskManager)) {
 val instanceID = 
instanceManager.getRegisteredInstance(taskManager).getId
 
-// IMPORTANT: Send the response to the "sender", which is not the
-//TaskManager actor, but the ask future!
-sender() ! decorateMessage(
+taskManager ! decorateMessage(
   AlreadyRegistered(
 instanceID,
-libraryCacheManager.getBlobServerPort)
-)
-  }
-  else {
+libraryCacheManager.getBlobServerPort))
+  } else {
 try {
   val instanceID = instanceManager.registerTaskManager(
 taskManager,
-connectionInfo,
-hardwareInformation,
-numberOfSlots,
+originalMsg.resourceId,
+originalMsg.connectionInfo,
+originalMsg.resources,
+originalMsg.numberOfSlots,
 leaderSessionID.orNull)
 
-  // IMPORTANT: Send the response to the "sender", which is not the
-  //TaskManager actor, but the ask future!
-  sender() ! decorateMessage(
-AcknowledgeRegistration(
-  instanceID,
-  libraryCacheManager.getBlobServerPort)
-  )
+  taskManager ! decorateMessage(
+

[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56855273
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,125 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
--- End diff --

What happens if this message never reaches the ResourceManager? Is there a 
mean that the RM can detect that it lost connection to the JM?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204567#comment-15204567
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854489
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java
 ---
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution;
+package org.apache.flink.runtime.clusterframework.standalone;
 
-public interface ExecutionObserver {
+/**
+ * The startup mode for the standalone setup,
+ */
+public enum StandaloneMode {
--- End diff --

Class is not used


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854489
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneMode.java
 ---
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution;
+package org.apache.flink.runtime.clusterframework.standalone;
 
-public interface ExecutionObserver {
+/**
+ * The startup mode for the standalone setup,
+ */
+public enum StandaloneMode {
--- End diff --

Class is not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854368
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import java.io.Serializable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Heartbeat message send by a standalone TaskManager to the resource 
master.
+ */
+public class HeartbeatMessage implements RequiresLeaderSessionID, 
Serializable {
--- End diff --

Class is not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3399) Count with timeout trigger

2016-03-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204560#comment-15204560
 ] 

Aljoscha Krettek commented on FLINK-3399:
-

Hi [~shikhar] I opened FLINK-3643 as an umbrella issue for keeping track of 
several things regarding triggers. Composite triggers are also a part of this. 
Please have a look if you want to get involved.

> Count with timeout trigger
> --
>
> Key: FLINK-3399
> URL: https://issues.apache.org/jira/browse/FLINK-3399
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shikhar Bhushan
>Priority: Minor
>
> I created an implementation of a trigger that I'd like to contribute, 
> https://gist.github.com/shikhar/2cb9f1b792be31b7c16e
> An example application - if a sink function operates more efficiently if it 
> is writing in a batched fashion, then the windowing mechanism + this trigger 
> can be used. Count to have an upper bound on batch size & better control on 
> memory usage, and timeout to ensure timeliness of the outputs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3399) Count with timeout trigger

2016-03-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-3399.
-
Resolution: Fixed

This will be subsumed by the work in FLINK-3643

> Count with timeout trigger
> --
>
> Key: FLINK-3399
> URL: https://issues.apache.org/jira/browse/FLINK-3399
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shikhar Bhushan
>Priority: Minor
>
> I created an implementation of a trigger that I'd like to contribute, 
> https://gist.github.com/shikhar/2cb9f1b792be31b7c16e
> An example application - if a sink function operates more efficiently if it 
> is writing in a batched fashion, then the windowing mechanism + this trigger 
> can be used. Count to have an upper bound on batch size & better control on 
> memory usage, and timeout to ensure timeliness of the outputs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204564#comment-15204564
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854344
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message is teh successful response to a heartbeat. 
+ */
+public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, 
java.io.Serializable {
--- End diff --

Class is not used


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204566#comment-15204566
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854368
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatMessage.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import java.io.Serializable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Heartbeat message send by a standalone TaskManager to the resource 
master.
+ */
+public class HeartbeatMessage implements RequiresLeaderSessionID, 
Serializable {
--- End diff --

Class is not used


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56854344
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/HeartbeatAcknowledgement.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+/**
+ * This message is teh successful response to a heartbeat. 
+ */
+public class HeartbeatAcknowledgement implements RequiresLeaderSessionID, 
java.io.Serializable {
--- End diff --

Class is not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3643) Improve Window Triggers

2016-03-21 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-3643:
---

 Summary: Improve Window Triggers
 Key: FLINK-3643
 URL: https://issues.apache.org/jira/browse/FLINK-3643
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.0.0
Reporter: Aljoscha Krettek


I think there are several shortcomings in the current window trigger system and 
I started a document to keep track of them: 
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing

The document is work-in-progress and I encourage everyone to read it and make 
suggestions:

We'll keep this issue to keep track of any sub-issues that we open for parts 
that we want to improve.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204555#comment-15204555
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56853621
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
--- End diff --

Maybe rename `get` to `getInstance`.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3631] CodeGenerator does not check type...

2016-03-21 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/1823

[FLINK-3631] CodeGenerator does not check type compatibility for equa…

Adds a check on the CodeGenerator.scala code which does a check on the left 
and right operands so that all the comparison expressions can work now. 
(=,!=,>,<)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3631

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1823.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1823


commit d107f4825d40998b9e40a5270b8df48dd48b8eb2
Author: ramkrishna 
Date:   2016-03-21T16:33:42Z

[FLINK-3631] CodeGenerator does not check type compatibility for equality
expressions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56853621
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
 context.unwatch(taskManager)
   }
 
+case msg: StopCluster =>
+
+  log.info(s"Stopping JobManager with final application status 
${msg.finalStatus()} " +
+s"and diagnostics: ${msg.message()}")
+
+  val respondTo = sender()
+
+  // stop all task managers
+  instanceManager.getAllRegisteredInstances.asScala foreach {
+instance =>
+  instance.getActorGateway.tell(msg)
+  }
+
+  // send resource manager the ok
+  currentResourceManager match {
+case Some(rm) =>
+
+  // inform rm
+  rm ! decorateMessage(msg)
+
+  respondTo ! decorateMessage(StopClusterSuccessful.get())
--- End diff --

Maybe rename `get` to `getInstance`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3631) CodeGenerator does not check type compatibility for equality expressions

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204556#comment-15204556
 ] 

ASF GitHub Bot commented on FLINK-3631:
---

GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/1823

[FLINK-3631] CodeGenerator does not check type compatibility for equa…

Adds a check on the CodeGenerator.scala code which does a check on the left 
and right operands so that all the comparison expressions can work now. 
(=,!=,>,<)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3631

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1823.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1823


commit d107f4825d40998b9e40a5270b8df48dd48b8eb2
Author: ramkrishna 
Date:   2016-03-21T16:33:42Z

[FLINK-3631] CodeGenerator does not check type compatibility for equality
expressions




> CodeGenerator does not check type compatibility for equality expressions
> 
>
> Key: FLINK-3631
> URL: https://issues.apache.org/jira/browse/FLINK-3631
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The following Table API query does not fail but produces an empty result:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
> // must fail. Field 'a is not a string.
> ds.filter( 'a === "nope" ).collect()
> {code}
> The generated flatMap code looks like this:
> {code}
> @Override
> public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws 
> Exception {
>   org.apache.flink.api.table.Row in1 = (org.apache.flink.api.table.Row) _in1;
>   
>   java.lang.String result$17 = (java.lang.String) in1.productElement(2);
>   int result$11 = (java.lang.Integer) in1.productElement(0);
>   long result$14 = (java.lang.Long) in1.productElement(1);
>   java.lang.String result$19 = "nope";
>   
>   boolean result$21 = result$19.equals(result$11);
>   
>   if (result$21) {
> out.setField(0, result$11);
>   out.setField(1, result$14);
>   out.setField(2, result$17);
> c.collect(out);
>   }
> }
> {code}
> I would expect the query to fail due to a Integer/String type conflict.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-03-21 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204553#comment-15204553
 ] 

Maximilian Michels commented on FLINK-2821:
---

It is now possible to separate the bind address from the public address: 
http://doc.akka.io/docs/akka/2.4.0/additional/faq.html#Why_are_replies_not_received_from_a_remote_actor_

I have a fix but it requires to bump our Akka version from {{2.3.7}} to 
{{2.4.x}}. Now the big downer, Akka 2.4.x is NOT compatible with Scala 2.10 
which is our default Scala version. Unless we upgrade to Scala 2.11, we won't 
be able to use this feature.

In the meantime, what we could do is to explicitly offer this feature only for 
the 2.11 release and document that accordingly.

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204550#comment-15204550
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56853405
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
--- End diff --

Formatting regression


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56853405
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -886,10 +957,42 @@ class JobManager(
   if (instanceManager.isRegistered(taskManager)) {
 log.info(s"Task manager ${taskManager.path} wants to disconnect, 
because $msg.")
 
-instanceManager.unregisterTaskManager(taskManager, false)
+  instanceManager.unregisterTaskManager(taskManager, false)
--- End diff --

Formatting regression


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-03-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-2821:
-

Assignee: Maximilian Michels

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1818#issuecomment-199363606
  
Due to the tight coupling of the ExecutionConfig and multiple Flink 
components
(e.g. PojoSerializer) the automatic serialization and manual 
deserialization of
user code objects via the UserCodeValue class caused problems. In order to 
minimize
the impact of the changes, I changed the serialization strategy to an 
explicit one.
One has to call `ExecutionConfig.serializeUserCode` to store the user code 
objects in a SerializedValue
object and nulling the corresponding member fields. If that is not done, 
then it is
assumed that the object is deserialized using a user code class loader. On 
the receiving side one has to call `ExecutionConfig.deserializeConfig` 
providing a class loader which knows the user code classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3633) Job submission silently fails when using user code types

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204529#comment-15204529
 ] 

ASF GitHub Bot commented on FLINK-3633:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1818#issuecomment-199363606
  
Due to the tight coupling of the ExecutionConfig and multiple Flink 
components
(e.g. PojoSerializer) the automatic serialization and manual 
deserialization of
user code objects via the UserCodeValue class caused problems. In order to 
minimize
the impact of the changes, I changed the serialization strategy to an 
explicit one.
One has to call `ExecutionConfig.serializeUserCode` to store the user code 
objects in a SerializedValue
object and nulling the corresponding member fields. If that is not done, 
then it is
assumed that the object is deserialized using a user code class loader. On 
the receiving side one has to call `ExecutionConfig.deserializeConfig` 
providing a class loader which knows the user code classes.


> Job submission silently fails when using user code types
> 
>
> Key: FLINK-3633
> URL: https://issues.apache.org/jira/browse/FLINK-3633
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.1.0
>
>
> With the changes introduced by FLINK-3327, it is no longer possible to run 
> remote Flink jobs which work on user code types. The reason is that now the 
> {{ExecutionConfig}} is directly stored in the {{JobGraph}} which is sent as 
> an Akka message to the {{JobManager}}. Per default, user code types are 
> automatically detected and registered in the {{ExecutionConfig}}. When 
> deserializing a {{JobGraph}} whose {{ExecutionConfig}} contains user code 
> classes the user code class loader is consequently required. However, Akka 
> does not have access to it and uses the system class loader. This causes that 
> Akka silently discards the {{SubmitJob}} message which cannot be deserialized 
> because of a {{ClassNotFoundException}}.
> I propose to not sent the {{ExecutionConfig}} explicitly with the 
> {{JobGraph}} and, thus, to partially revert the changes to before FLINK-3327. 
> Before, the {{ExectuionConfig}} was serialized into the job configuration and 
> deserialized on the {{TaskManager}} using the proper user code class loader.
> In order to reproduce the problem you can submit the following job to a 
> remote cluster.
> {code}
> public class Job {
>   public static class CustomType {
>   private final int value;
>   public CustomType(int value) {
>   this.value = value;
>   }
>   @Override
>   public String toString() {
>   return "CustomType(" + value + ")";
>   }
>   }
>   public static void main(String[] args) throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.createRemoteEnvironment(Address, Port, PathToJar);
>   env.getConfig().disableAutoTypeRegistration();
>   DataSet input = env.fromElements(1,2,3,4,5);
>   DataSet customTypes = input.map(new 
> MapFunction() {
>   @Override
>   public CustomType map(Integer integer) throws Exception 
> {
>   return new CustomType(integer);
>   }
>   });
>   customTypes.print();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204506#comment-15204506
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56847510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
 ---
@@ -21,7 +21,7 @@
 /**
  * Interface for message decorators
  */
-public interface MessageDecorator {
+public interface MessageDecorator extends java.io.Serializable {
--- End diff --

Good catch :-)


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56847510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java
 ---
@@ -21,7 +21,7 @@
 /**
  * Interface for message decorators
  */
-public interface MessageDecorator {
+public interface MessageDecorator extends java.io.Serializable {
--- End diff --

Good catch :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204491#comment-15204491
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56845833
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+
+/**
+ * Class for Resource Ids assigned at the FlinkResourceManager.
+ */
+public class ResourceID implements Serializable {
--- End diff --

Why don't we use `AbstractID` here?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56845833
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+import java.io.Serializable;
+
+/**
+ * Class for Resource Ids assigned at the FlinkResourceManager.
+ */
+public class ResourceID implements Serializable {
--- End diff --

Why don't we use `AbstractID` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204476#comment-15204476
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r56844658
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/standalone/StandaloneResourceManager.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.standalone;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * A standalone implementation of the resource manager. Used when the 
system is started in
+ * standalone mode (via scripts), rather than via a resource framework 
like YARN or Mesos.
+ */
+public class StandaloneResourceManager extends 
FlinkResourceManager {
+   
+
+   public StandaloneResourceManager(Configuration flinkConfig, 
LeaderRetrievalService leaderRetriever) {
+   super(0, flinkConfig, leaderRetriever);
+   }
+
+   // 

+   //  Framework specific behavior
+   // 

+
+
+   @Override
+   protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
+   super.newJobManagerLeaderAvailable(leaderAddress, 
leaderSessionID);
+   }
+
+   @Override
+   protected void initialize() throws Exception {
+   // nothing to initialize
+   }
+
+   @Override
+   protected void leaderUpdated() {
+   // nothing to update
+   }
+
+   @Override
+   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+   }
+
+   @Override
+   protected void fatalError(String message, Throwable error) {
+   log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+   LOG.error("Shutting down process");
--- End diff --

Why do we use two different logger variables here?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >