[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-09-07 Thread chobeat
Github user chobeat commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r7389
  
--- Diff: docs/apis/cli.md ---
@@ -187,6 +187,8 @@ Action "run" compiles and runs a program.
   java.net.URLClassLoader}.
  -d,--detachedIf present, runs the job in 
detached
   mode
+--configDir The configuration directory with 
which
--- End diff --

This is a space, I checked. Do you want it to be aligned to the beginning 
of the other lines or is it ok like it is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469960#comment-15469960
 ] 

ASF GitHub Bot commented on FLINK-3042:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r7227
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -122,14 +123,25 @@
public abstract Class getTypeClass();
 
/**
-* Returns the generic parameters of this type.
+* Optional method for giving Flink's type extraction system 
information about the mapping
+* of a generic type parameter to the type information of a subtype. 
This information is necessary
+* in cases where type information should be deduced from an input type.
 *
-* @return The list of generic parameters. This list can be empty.
+* For instance, a method for a {@link Tuple2} would look like this:
+* 
+* Map m = new HashMap();
+* m.put("T0", this.getTypeAt(0));
+* m.put("T1", this.getTypeAt(1));
+* return m;
+* 
+*
+* @return map of inferred subtypes; it must not contain all generic 
parameters as key;
--- End diff --

You are right, "it doesn't have to contain..." would be better. What I 
wanted to say is: It is also ok to just supply partial type information for 
generic parameters (so returning an empty map is always acceptable).


> Define a way to let types create their own TypeInformation
> --
>
> Key: FLINK-3042
> URL: https://issues.apache.org/jira/browse/FLINK-3042
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Timo Walther
> Fix For: 1.0.0
>
>
> Currently, introducing new Types that should have specific TypeInformation 
> requires
>   - Either integration with the TypeExtractor
>   - Or manually constructing the TypeInformation (potentially at every place) 
> and using type hints everywhere.
> I propose to add a way to allow classes to create their own TypeInformation 
> (like a static method "createTypeInfo()").
> To support generic nested types (like Optional / Either), the type extractor 
> would provide a Map of what generic variables map to what types (deduced from 
> the input). The class can use that to create the correct nested 
> TypeInformation (possibly by calling the TypeExtractor again, passing the Map 
> of generic bindings).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469963#comment-15469963
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user chobeat commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r7389
  
--- Diff: docs/apis/cli.md ---
@@ -187,6 +187,8 @@ Action "run" compiles and runs a program.
   java.net.URLClassLoader}.
  -d,--detachedIf present, runs the job in 
detached
   mode
+--configDir The configuration directory with 
which
--- End diff --

This is a space, I checked. Do you want it to be aligned to the beginning 
of the other lines or is it ok like it is?


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4547) when call connect method in AkkaRpcService using same address and same rpc gateway class, the returned gateways are equal with respect to equals and hashCode

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470051#comment-15470051
 ] 

ASF GitHub Bot commented on FLINK-4547:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77782622
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -189,7 +191,49 @@ public void stop() {
rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
}
 
-   // 

+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null) {
+   return false;
+   }
+
+   if(Proxy.isProxyClass(o.getClass())) {
+   return o.equals(this);
+   }
--- End diff --

Alright, I understand that now :-).


> when call connect method in AkkaRpcService using same address and same rpc 
> gateway class, the returned gateways are equal with respect to equals and 
> hashCode
> -
>
> Key: FLINK-4547
> URL: https://issues.apache.org/jira/browse/FLINK-4547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: zhangjing
>Assignee: zhangjing
>
> Now every time call connect method in AkkaRpcService class using same address 
> and same rpc gateway class, the return gateway object is totally different 
> with each other which equals and hashcode are not same. 
> Maybe it’s reasonable to have the same result (equals return true, and 
> hashcode is same) when using the same address and same Gateway class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...

2016-09-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77782622
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -189,7 +191,49 @@ public void stop() {
rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
}
 
-   // 

+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null) {
+   return false;
+   }
+
+   if(Proxy.isProxyClass(o.getClass())) {
+   return o.equals(this);
+   }
--- End diff --

Alright, I understand that now :-).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2452: [Flink-4450] add a new module "flink-apache-storm" to sup...

2016-09-07 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2452
  
@liuyuzhong Let's wait a couple more days for the community to respond. You 
don't have to close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470412#comment-15470412
 ] 

Simone Robutti commented on FLINK-4565:
---

The resulting API should look like `ds1.join(ds2).where('a in 'b)` or 
`ds1.in(ds2).something('a,'b)`?

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true

2016-09-07 Thread Kirill Morozov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirill Morozov reassigned FLINK-4506:
-

Assignee: Kirill Morozov

> CsvOutputFormat defaults allowNullValues to false, even though doc and 
> declaration says true
> 
>
> Key: FLINK-4506
> URL: https://issues.apache.org/jira/browse/FLINK-4506
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Documentation
>Reporter: Michael Wong
>Assignee: Kirill Morozov
>Priority: Minor
>
> In the constructor, it has this
> {code}
> this.allowNullValues = false;
> {code}
> But in the setAllowNullValues() method, the doc says the allowNullValues is 
> true by default. Also, in the declaration of allowNullValues, the value is 
> set to true. It probably makes the most sense to change the constructor.
> {code}
>   /**
>* Configures the format to either allow null values (writing an empty 
> field),
>* or to throw an exception when encountering a null field.
>* 
>* by default, null values are allowed.
>*
>* @param allowNulls Flag to indicate whether the output format should 
> accept null values.
>*/
>   public void setAllowNullValues(boolean allowNulls) {
>   this.allowNullValues = allowNulls;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2477: [FLINK-4506] CsvOutputFormat defaults allowNullVal...

2016-09-07 Thread kirill-morozov-epam
GitHub user kirill-morozov-epam opened a pull request:

https://github.com/apache/flink/pull/2477

[FLINK-4506] CsvOutputFormat defaults allowNullValues to false, even …



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kirill-morozov-epam/flink FLINK-4506

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2477.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2477






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470437#comment-15470437
 ] 

Fabian Hueske commented on FLINK-4565:
--

I thought rather of something like this: {{ds1.where('a in d2)}}. This would be 
the first time that a {{Table}} would be included in an expression. So not sure 
how easy it is to put it together and whether there are side effects to 
consider.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470472#comment-15470472
 ] 

Simone Robutti commented on FLINK-4565:
---

That's exactly why I'm asking it: I had no idea what to use as a right operand. 

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470536#comment-15470536
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77814628
  
--- Diff: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A MeterView provides a rate of events per second over a given time 
period. The events are counted by a {@link Counter}.
+ * A history of measurements is maintained from which the rate of events 
is calculated on demand.
+ */
+public class MeterView implements Meter, View {
--- End diff --

I think it would be good to add some more javadocs to this class, at least 
for the `timeSpanInSeconds` argument of the ctor.


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView

2016-09-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77821528
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

I think the metric needs to be documented in the list of "System metrics".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-07 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470419#comment-15470419
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

I am able to handle cases for the iterative jobs. But when we go to the 
LargeRecordHandlers then it is much more trickier. Checking that part Will get 
back on that.

Currently the design is that the MemoryAllocator will be passed on to the 
Sorters and the memory allocator will have pre created memory segments. If the  
memory allocator is created by Iterative tasks then we ensure that such 
segments are not directly released to memory manager and retain them till the 
iterative tasks receive termination signal.
In normal batch task cases - the memory allocators created are not to be kept 
for further iterations and hence we close them out.
The sorters create read buffers, write buffers and large buffers. These are all 
static based. But inside large record handler we have some dynamic way to 
decide the number of records needed for keys and records.

Will get back on this. Any suggestions/feedbacks here?


> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView

2016-09-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77814628
  
--- Diff: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A MeterView provides a rate of events per second over a given time 
period. The events are counted by a {@link Counter}.
+ * A history of measurements is maintained from which the rate of events 
is calculated on demand.
+ */
+public class MeterView implements Meter, View {
--- End diff --

I think it would be good to add some more javadocs to this class, at least 
for the `timeSpanInSeconds` argument of the ctor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2443: [FLINK-3950] Implement MeterView

2016-09-07 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2443
  
Overall a good change.
Once my two comments are addressed, +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470566#comment-15470566
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2443
  
Overall a good change.
Once my two comments are addressed, +1 to merge.


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470488#comment-15470488
 ] 

ASF GitHub Bot commented on FLINK-4506:
---

GitHub user kirill-morozov-epam opened a pull request:

https://github.com/apache/flink/pull/2477

[FLINK-4506] CsvOutputFormat defaults allowNullValues to false, even …



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kirill-morozov-epam/flink FLINK-4506

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2477.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2477






> CsvOutputFormat defaults allowNullValues to false, even though doc and 
> declaration says true
> 
>
> Key: FLINK-4506
> URL: https://issues.apache.org/jira/browse/FLINK-4506
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Documentation
>Reporter: Michael Wong
>Assignee: Kirill Morozov
>Priority: Minor
>
> In the constructor, it has this
> {code}
> this.allowNullValues = false;
> {code}
> But in the setAllowNullValues() method, the doc says the allowNullValues is 
> true by default. Also, in the declaration of allowNullValues, the value is 
> set to true. It probably makes the most sense to change the constructor.
> {code}
>   /**
>* Configures the format to either allow null values (writing an empty 
> field),
>* or to throw an exception when encountering a null field.
>* 
>* by default, null values are allowed.
>*
>* @param allowNulls Flag to indicate whether the output format should 
> accept null values.
>*/
>   public void setAllowNullValues(boolean allowNulls) {
>   this.allowNullValues = allowNulls;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView

2016-09-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77815865
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

I think the metric name should include the unit, in this case perMinute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470554#comment-15470554
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77815865
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

I think the metric name should include the unit, in this case perMinute.


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4589) Fix Merging of Covering Window in MergingWindowSet

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470397#comment-15470397
 ] 

ASF GitHub Bot commented on FLINK-4589:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2476

[FLINK-4589] Fix Merging of Covering Window in MergingWindowSet

This also adds two new test cases for that problem.

R: @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink fix-merging-set

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2476


commit 2eec6ba7ab6acf55ead2ea395a33043c87d1c911
Author: Aljoscha Krettek 
Date:   2016-09-07T11:51:53Z

[FLINK-4589] Fix Merging of Covering Window in MergingWindowSet

This also adds two new test cases for that problem.




> Fix Merging of Covering Window in MergingWindowSet
> --
>
> Key: FLINK-4589
> URL: https://issues.apache.org/jira/browse/FLINK-4589
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.0.4, 1.2.0, 1.1.3
>
>
> Right now, when a new window gets merged that covers all of the existing 
> window {{MergingWindowSet}} does not correctly set the state window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2476: [FLINK-4589] Fix Merging of Covering Window in Mer...

2016-09-07 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2476

[FLINK-4589] Fix Merging of Covering Window in MergingWindowSet

This also adds two new test cases for that problem.

R: @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink fix-merging-set

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2476


commit 2eec6ba7ab6acf55ead2ea395a33043c87d1c911
Author: Aljoscha Krettek 
Date:   2016-09-07T11:51:53Z

[FLINK-4589] Fix Merging of Covering Window in MergingWindowSet

This also adds two new test cases for that problem.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4591) Select star does not work with grouping

2016-09-07 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4591:
---

 Summary: Select star does not work with grouping
 Key: FLINK-4591
 URL: https://issues.apache.org/jira/browse/FLINK-4591
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


It would be consistent if this would also work:

{{table.groupBy('*).select("*)}}

Currently, the star only works in a plain select without grouping.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470653#comment-15470653
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77821528
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

I think the metric needs to be documented in the list of "System metrics".


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77787126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

The class has some docs but as you can see given my initial question, it's 
purpose was not clear to me.

Yes, I actually thought about marking `leaderSessionID` `volatile`. 

Given the interface of this class every component which has a reference to 
this registry is allowed to change the leader session ID. This can be 
problematic because components other than the `ResourceManager` should only be 
allowed to retrieve the leader session ID.

I'm actually wondering whether it is not necessary to notify the components 
about a new leader session ID. For example, the `SlotManager` should probably 
free its registered slots when it loses the leadership. Wouldn't these calls be 
suitable to transmit the current leader session ID? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470114#comment-15470114
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77787126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

The class has some docs but as you can see given my initial question, it's 
purpose was not clear to me.

Yes, I actually thought about marking `leaderSessionID` `volatile`. 

Given the interface of this class every component which has a reference to 
this registry is allowed to change the leader session ID. This can be 
problematic because components other than the `ResourceManager` should only be 
allowed to retrieve the leader session ID.

I'm actually wondering whether it is not necessary to notify the components 
about a new leader session ID. For example, the `SlotManager` should probably 
free its registered slots when it loses the leadership. Wouldn't these calls be 
suitable to transmit the current leader session ID? 


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2452: [Flink-4450] add a new module "flink-apache-storm" to sup...

2016-09-07 Thread liuyuzhong
Github user liuyuzhong commented on the issue:

https://github.com/apache/flink/pull/2452
  
@mxm OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4588) Fix Merging of Covering Window in MergingWindowSet

2016-09-07 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4588:
---

 Summary: Fix Merging of Covering Window in MergingWindowSet
 Key: FLINK-4588
 URL: https://issues.apache.org/jira/browse/FLINK-4588
 Project: Flink
  Issue Type: Bug
  Components: Windowing Operators
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.0.4, 1.2.0, 1.1.3


Right now, when a new window gets merged that covers all of the existing window 
{{MergingWindowSet}} does not correctly set the state window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4579) Add StateBackendFactory for RocksDB Backend

2016-09-07 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-4579:
-
Issue Type: Improvement  (was: Bug)

> Add StateBackendFactory for RocksDB Backend
> ---
>
> Key: FLINK-4579
> URL: https://issues.apache.org/jira/browse/FLINK-4579
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>
> Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} 
> which means that users cannot specify to use the RocksDB backend in the flink 
> configuration.
> If we add a factory for rocksdb we should also think about adding the rocksdb 
> backend to the standard distribution lib, otherwise it is only usable if 
> users manually place the rocks jars in the Flink lib folder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4589) Fix Merging of Covering Window in MergingWindowSet

2016-09-07 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4589:
---

 Summary: Fix Merging of Covering Window in MergingWindowSet
 Key: FLINK-4589
 URL: https://issues.apache.org/jira/browse/FLINK-4589
 Project: Flink
  Issue Type: Bug
  Components: Windowing Operators
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.0.4, 1.2.0, 1.1.3


Right now, when a new window gets merged that covers all of the existing window 
{{MergingWindowSet}} does not correctly set the state window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...

2016-09-07 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r7227
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 ---
@@ -122,14 +123,25 @@
public abstract Class getTypeClass();
 
/**
-* Returns the generic parameters of this type.
+* Optional method for giving Flink's type extraction system 
information about the mapping
+* of a generic type parameter to the type information of a subtype. 
This information is necessary
+* in cases where type information should be deduced from an input type.
 *
-* @return The list of generic parameters. This list can be empty.
+* For instance, a method for a {@link Tuple2} would look like this:
+* 
+* Map m = new HashMap();
+* m.put("T0", this.getTypeAt(0));
+* m.put("T1", this.getTypeAt(1));
+* return m;
+* 
+*
+* @return map of inferred subtypes; it must not contain all generic 
parameters as key;
--- End diff --

You are right, "it doesn't have to contain..." would be better. What I 
wanted to say is: It is also ok to just supply partial type information for 
generic parameters (so returning an empty map is always acceptable).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2455
  
@tillrohrmann @StephanEwen , thanks for your review. I changed the code 
based on your comment, including two points:
1. Change the JIRA and the PR subject line to better reflect the actual 
changes.
2. Modify the testcase which connect to invalid address in 
AkkaRpcServiceTest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4547) when call connect method in AkkaRpcService using same address and same rpc gateway class, the returned gateways are equal with respect to equals and hashCode

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469997#comment-15469997
 ] 

ASF GitHub Bot commented on FLINK-4547:
---

Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2455
  
@tillrohrmann @StephanEwen , thanks for your review. I changed the code 
based on your comment, including two points:
1. Change the JIRA and the PR subject line to better reflect the actual 
changes.
2. Modify the testcase which connect to invalid address in 
AkkaRpcServiceTest.


> when call connect method in AkkaRpcService using same address and same rpc 
> gateway class, the returned gateways are equal with respect to equals and 
> hashCode
> -
>
> Key: FLINK-4547
> URL: https://issues.apache.org/jira/browse/FLINK-4547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: zhangjing
>Assignee: zhangjing
>
> Now every time call connect method in AkkaRpcService class using same address 
> and same rpc gateway class, the return gateway object is totally different 
> with each other which equals and hashcode are not same. 
> Maybe it’s reasonable to have the same result (equals return true, and 
> hashcode is same) when using the same address and same Gateway class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470065#comment-15470065
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77783497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

What exactly do you mean? The class is thread-safe and documented (though 
documentation can be improved). There is no need for locking. Do you mean 
marking the leaderSessionID `volatile`? It should be fine if leader changes 
propagate lazily.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77783497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

What exactly do you mean? The class is thread-safe and documented (though 
documentation can be improved). There is no need for locking. Do you mean 
marking the leaderSessionID `volatile`? It should be fine if leader changes 
propagate lazily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4376) Implement job master skeleton for managing single job

2016-09-07 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-4376.
-
Resolution: Duplicate

I think this is duplicated with https://issues.apache.org/jira/browse/FLINK-4408

> Implement job master skeleton for managing single job 
> --
>
> Key: FLINK-4376
> URL: https://issues.apache.org/jira/browse/FLINK-4376
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Wenlong Lyu
>Assignee: Kurt Young
>
> This jira targets to set up a skeleton of the new JobMaster, translate scala 
> codes to java. Make necessary changes for JobMaster only manages one job. 
> This will include :
> 1. Blob related logics
> 2. SubmittedJobGraphStore
> 3. checkpoint & savepoint 
> 4. metrics registry



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4406) Implement job master registration at resource manager

2016-09-07 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-4406:
--
Assignee: (was: Kurt Young)

> Implement job master registration at resource manager
> -
>
> Key: FLINK-4406
> URL: https://issues.apache.org/jira/browse/FLINK-4406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Wenlong Lyu
>
> Job Master needs to register to Resource Manager when starting and then 
> watches leadership changes of RM, and trigger re-registration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...

2016-09-07 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77827675
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Base class for implementing a type information factory. A type 
information factory allows for
+ * plugging-in user-defined {@link TypeInformation} into the Flink type 
system. The factory is
+ * called during the type extraction phase if the corresponding type has 
been annotated with
+ * {@link TypeInfo}. In a hierarchy of types the closest factory will be 
chosen while traversing
+ * upwards, however, a globally registered factory has highest precedence
+ * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ *
+ * @param  type for which {@link TypeInformation} is created
+ */
+@Public
+public abstract class TypeInfoFactory {
+
+   public TypeInfoFactory() {
+   // default constructor
--- End diff --

What is the reason for the empty no-arg constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView

2016-09-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77825932
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

this just served as an example...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470728#comment-15470728
 ] 

ASF GitHub Bot commented on FLINK-3042:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77827675
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Base class for implementing a type information factory. A type 
information factory allows for
+ * plugging-in user-defined {@link TypeInformation} into the Flink type 
system. The factory is
+ * called during the type extraction phase if the corresponding type has 
been annotated with
+ * {@link TypeInfo}. In a hierarchy of types the closest factory will be 
chosen while traversing
+ * upwards, however, a globally registered factory has highest precedence
+ * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ *
+ * @param  type for which {@link TypeInformation} is created
+ */
+@Public
+public abstract class TypeInfoFactory {
+
+   public TypeInfoFactory() {
+   // default constructor
--- End diff --

What is the reason for the empty no-arg constructor?


> Define a way to let types create their own TypeInformation
> --
>
> Key: FLINK-3042
> URL: https://issues.apache.org/jira/browse/FLINK-3042
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Timo Walther
> Fix For: 1.0.0
>
>
> Currently, introducing new Types that should have specific TypeInformation 
> requires
>   - Either integration with the TypeExtractor
>   - Or manually constructing the TypeInformation (potentially at every place) 
> and using type hints everywhere.
> I propose to add a way to allow classes to create their own TypeInformation 
> (like a static method "createTypeInfo()").
> To support generic nested types (like Optional / Either), the type extractor 
> would provide a Map of what generic variables map to what types (deduced from 
> the input). The class can use that to create the correct nested 
> TypeInformation (possibly by calling the TypeExtractor again, passing the Map 
> of generic bindings).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470881#comment-15470881
 ] 

ASF GitHub Bot commented on FLINK-3042:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77842627
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -792,12 +832,40 @@ else if (t instanceof Class) {
 
return null;
}
-   
+
+   @SuppressWarnings({"unchecked", "rawtypes"})
private  TypeInformation 
createTypeInfoFromInput(TypeVariable returnTypeVar, ArrayList 
inputTypeHierarchy, Type inType, TypeInformation inTypeInfo) {
TypeInformation info = null;
-   
+
+   // use a factory to find corresponding type information to type 
variable
+   final ArrayList factoryHierarchy = new 
ArrayList<>(inputTypeHierarchy);
+   final TypeInfoFactory factory = 
getClosestFactory(factoryHierarchy, inType);
+   if (factory != null) {
+   // the type that defines the factory is last in factory 
hierarchy
+   final Type factoryDefiningType = 
factoryHierarchy.get(factoryHierarchy.size() - 1);
+   // defining type has generics, the factory need to be 
asked for a mapping of subtypes to type information
+   if (factoryDefiningType instanceof ParameterizedType) {
--- End diff --

A TypeInformation is created here only with factories of parameterized 
types?


> Define a way to let types create their own TypeInformation
> --
>
> Key: FLINK-3042
> URL: https://issues.apache.org/jira/browse/FLINK-3042
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Timo Walther
> Fix For: 1.0.0
>
>
> Currently, introducing new Types that should have specific TypeInformation 
> requires
>   - Either integration with the TypeExtractor
>   - Or manually constructing the TypeInformation (potentially at every place) 
> and using type hints everywhere.
> I propose to add a way to allow classes to create their own TypeInformation 
> (like a static method "createTypeInfo()").
> To support generic nested types (like Optional / Either), the type extractor 
> would provide a Map of what generic variables map to what types (deduced from 
> the input). The class can use that to create the correct nested 
> TypeInformation (possibly by calling the TypeExtractor again, passing the Map 
> of generic bindings).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...

2016-09-07 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2337#discussion_r77842627
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -792,12 +832,40 @@ else if (t instanceof Class) {
 
return null;
}
-   
+
+   @SuppressWarnings({"unchecked", "rawtypes"})
private  TypeInformation 
createTypeInfoFromInput(TypeVariable returnTypeVar, ArrayList 
inputTypeHierarchy, Type inType, TypeInformation inTypeInfo) {
TypeInformation info = null;
-   
+
+   // use a factory to find corresponding type information to type 
variable
+   final ArrayList factoryHierarchy = new 
ArrayList<>(inputTypeHierarchy);
+   final TypeInfoFactory factory = 
getClosestFactory(factoryHierarchy, inType);
+   if (factory != null) {
+   // the type that defines the factory is last in factory 
hierarchy
+   final Type factoryDefiningType = 
factoryHierarchy.get(factoryHierarchy.size() - 1);
+   // defining type has generics, the factory need to be 
asked for a mapping of subtypes to type information
+   if (factoryDefiningType instanceof ParameterizedType) {
--- End diff --

A TypeInformation is created here only with factories of parameterized 
types?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint

2016-09-07 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4592:
---

 Summary: Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
 Key: FLINK-4592
 URL: https://issues.apache.org/jira/browse/FLINK-4592
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Timo Walther


It seems that the test is still non deterministic.

{code}
org.apache.flink.api.table.expressions.ScalarFunctionsTest
testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest)
  Time elapsed: 0.083 sec  <<< FAILURE!
org.junit.ComparisonFailure: Wrong result for: 
AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET "ISO-8859-1" 
COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') expected:<[tru]e> but 
was:<[fals]e>
at org.junit.Assert.assertEquals(Assert.java:115)
at 
org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126)
at 
org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123)
at 
scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87)
at 
org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470705#comment-15470705
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77825932
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

this just served as an example...


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView

2016-09-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77829590
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

With the two suggested changes, we can make the example something useful ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470749#comment-15470749
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2443#discussion_r77829590
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
-   this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   Counter c = this.metrics.counter("numRecordsOut");
+   this.output = new CountingOutput(output, c);
+   this.metrics.meter("numRecordsOutRate", new MeterView(c, 60));
--- End diff --

With the two suggested changes, we can make the example something useful ;)


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

2016-09-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2440#discussion_r77856665
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+   public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+   private KeyGroupRangeAssignment() {
+   throw new AssertionError();
+   }
+
+   /**
+* Assigns the given key to a parallel operator index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @param parallelism the current parallelism of the operator
+* @return the index of the parallel operator to which the given key 
should be routed.
+*/
+   public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
+   return computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, assignToKeyGroup(key, maxParallelism));
+   }
+
+   /**
+* Assigns the given key to a key-group index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @return the key-group to which the given key is assigned
+*/
+   public static final int assignToKeyGroup(Object key, int 
maxParallelism) {
+   return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+   }
+
+   /**
+* Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
+* parallelism.
+*
+* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+* to go beyond this boundary, this method must perform arithmetic on 
long values.
+*
+* @param maxParallelism Maximal parallelism that the job was initially 
created with.
+* @param parallelismThe current parallelism under which the job 
runs. Must be <= maxParallelism.
+* @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+* @return
+*/
+   public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+   int maxParallelism,
+   int parallelism,
+   int operatorIndex) {
+   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
+   Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
+   Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+   int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+   int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
--- End diff --

Can't we simplify this expression to
```
int start = (operatorIndex * maxParallelism) / parallelism;
int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1;
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4585) Fix broken links in index.md

2016-09-07 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4585.
---
Resolution: Fixed

Resolved in http://git-wip-us.apache.org/repos/asf/flink-web/commit/bb6d820f

> Fix broken links in index.md
> 
>
> Key: FLINK-4585
> URL: https://issues.apache.org/jira/browse/FLINK-4585
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Alexander Pivovarov
>Priority: Minor
>
> The following links are broken
> DataSet API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html
> Table API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html
> Gelly
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/index.html
> The following links show "Page 'X' Has Moved to" for 1-2 sec  and then 
> redirect to another page
> DataStream API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html
> programming guide
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html
> redirects-to DataSet API: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html
> probably it should be "Basic API Concepts" 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html
> or Quick Start - 
> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html
> CEP
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html
> ML
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/ml/index.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/ml/index.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2478: [FLINK-4595] Close FileOutputStream in ParameterTo...

2016-09-07 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2478

[FLINK-4595] Close FileOutputStream in ParameterTool

https://issues.apache.org/jira/browse/FLINK-4595

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4595

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2478


commit 46f68d5fc621324368ad31fbba52bdf13abfae48
Author: Alexander Pivovarov 
Date:   2016-09-07T21:11:06Z

[FLINK-4595] Close FileOutputStream in ParameterTool




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4595) Close FileOutputStream in ParameterTool

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471823#comment-15471823
 ] 

ASF GitHub Bot commented on FLINK-4595:
---

GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2478

[FLINK-4595] Close FileOutputStream in ParameterTool

https://issues.apache.org/jira/browse/FLINK-4595

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4595

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2478


commit 46f68d5fc621324368ad31fbba52bdf13abfae48
Author: Alexander Pivovarov 
Date:   2016-09-07T21:11:06Z

[FLINK-4595] Close FileOutputStream in ParameterTool




> Close FileOutputStream in ParameterTool
> ---
>
> Key: FLINK-4595
> URL: https://issues.apache.org/jira/browse/FLINK-4595
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> ParameterTool and ParameterToolTest do not close FileOutputStream
> {code}
> defaultProps.store(new FileOutputStream(file), "Default file created by 
> Flink's ParameterUtil.createPropertiesFile()");
> {code}
> {code}
> props.store(new FileOutputStream(propertiesFile), "Test properties");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4595) Close FileOutputStream in ParameterTool

2016-09-07 Thread Alexander Pivovarov (JIRA)
Alexander Pivovarov created FLINK-4595:
--

 Summary: Close FileOutputStream in ParameterTool
 Key: FLINK-4595
 URL: https://issues.apache.org/jira/browse/FLINK-4595
 Project: Flink
  Issue Type: Bug
  Components: Java API
Affects Versions: 1.1.2
Reporter: Alexander Pivovarov
Priority: Trivial


ParameterTool and ParameterToolTest do not close FileOutputStream
{code}
defaultProps.store(new FileOutputStream(file), "Default file created by Flink's 
ParameterUtil.createPropertiesFile()");
{code}

{code}
props.store(new FileOutputStream(propertiesFile), "Test properties");
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-07 Thread Johannes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:

Attachment: FLINK4586Test.scala

Scala unit test

> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Priority: Minor
> Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue

2016-09-07 Thread Johannes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes updated FLINK-4586:

Attachment: (was: FLINK4586Test.scala)

> NumberSequenceIterator and Accumulator threading issue
> --
>
> Key: FLINK-4586
> URL: https://issues.apache.org/jira/browse/FLINK-4586
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.2
>Reporter: Johannes
>Priority: Minor
>
> There is a strange problem when using the NumberSequenceIterator in 
> combination with an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite 
> parts of intermediate solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is 
> something completely different, like {{8.08}}, dependent on the number of 
> cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a 
> likely threading problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
>   var a : AverageAccumulator = _
>   override def map(value: Long): Long = {
> a.add(value)
> value
>   }
>   override def open(parameters: Configuration): Unit = {
> a = new AverageAccumulator
> getRuntimeContext.addAccumulator("test", a)
>   }
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471287#comment-15471287
 ] 

ASF GitHub Bot commented on FLINK-3755:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2440#discussion_r77870173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+   public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+   private KeyGroupRangeAssignment() {
+   throw new AssertionError();
+   }
+
+   /**
+* Assigns the given key to a parallel operator index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @param parallelism the current parallelism of the operator
+* @return the index of the parallel operator to which the given key 
should be routed.
+*/
+   public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
+   return computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, assignToKeyGroup(key, maxParallelism));
+   }
+
+   /**
+* Assigns the given key to a key-group index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @return the key-group to which the given key is assigned
+*/
+   public static final int assignToKeyGroup(Object key, int 
maxParallelism) {
+   return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+   }
+
+   /**
+* Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
+* parallelism.
+*
+* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+* to go beyond this boundary, this method must perform arithmetic on 
long values.
+*
+* @param maxParallelism Maximal parallelism that the job was initially 
created with.
+* @param parallelismThe current parallelism under which the job 
runs. Must be <= maxParallelism.
+* @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+* @return
+*/
+   public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+   int maxParallelism,
+   int parallelism,
+   int operatorIndex) {
+   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
+   Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
+   Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+   int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+   int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
--- End diff --

I don't think that this is giving us the correct inverse for 
computeOperatorIndexForKeyGroup(...). Our test 
CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates 
counter-examples.


> Introduce key groups for key-value state to support dynamic scaling
> ---
>
> Key: FLINK-3755
> URL: https://issues.apache.org/jira/browse/FLINK-3755
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.1.0
>Reporter: 

[jira] [Created] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG

2016-09-07 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4590:
-

 Summary: Some Table API tests are failing when debug lvl is set to 
DEBUG
 Key: FLINK-4590
 URL: https://issues.apache.org/jira/browse/FLINK-4590
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.2.0
Reporter: Robert Metzger


For debugging another issue, I've set the log level on travis to DEBUG.
After that, the Table API tests started failing

{code}

Failed tests: 
  SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
applying rule DataSetScanRule
  SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
applying rule DataSetScanRule
  SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
applying rule DataSetScanRule
  SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
applying rule DataSetScanRule
  SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
occurred while applying rule DataSetScanRule
  SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
occurred while applying rule DataSetScanRule
  SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
occurred while applying rule DataSetScanRule
  SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
occurred while applying rule DataSetScanRule
  SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
applying rule DataSetScanRule
  SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
applying rule DataSetScanRule
  SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
applying rule DataSetScanRule
  SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
applying rule DataSetScanRule
{code}

Probably Calcite is executing additional assertions depending on the debug 
level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4596) Class not found exception when RESTART_STRATEGY is configured with fully qualified class name in the yaml

2016-09-07 Thread Nagarjun Guraja (JIRA)
Nagarjun Guraja created FLINK-4596:
--

 Summary: Class not found exception when RESTART_STRATEGY is 
configured with fully qualified class name in the yaml
 Key: FLINK-4596
 URL: https://issues.apache.org/jira/browse/FLINK-4596
 Project: Flink
  Issue Type: Bug
Reporter: Nagarjun Guraja


CAUSE: createRestartStrategyFactory converts configured value of strategyname 
to lowercase and searches for class name using lowercased string.  

FIX: Do not lower case the strategy config value or just lowercase for the 
switch case alone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4537) ResourceManager registration with JobManager

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472525#comment-15472525
 ] 

ASF GitHub Bot commented on FLINK-4537:
---

GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2479

[FLINK-4537] [cluster management] ResourceManager registration with 
JobManager

This pull request is to implement ResourceManager registration with 
JobManager, which including:
1. Check whether input resourceManagerLeaderId is as same as the current 
leadershipSessionId of resourceManager. If not, it means that maybe two or more 
resourceManager exists at the same time, and current resourceManager is not the 
proper rm. so it rejects or ignores the registration.
2. Check whether exists a valid JobMaster at the giving address by 
connecting to the address. Reject the registration from invalid address.(Hidden 
in the connect logic)
3. Keep JobID and JobMasterGateway mapping relationships.
4. Start a JobMasterLeaderListener at the given JobID to listen to the 
leadership of the specified JobMaster.
5. Send registration successful ack to the jobMaster.

Main difference are 6 points:
1. Add getJobMasterLeaderRetriever method to get job master leader 
retriever in HighAvailabilityServices, NonHaServices, A inner class in 
TaskExecutor, TestingHighAvailabilityServices.
2. Change registerJobMaster method logic of ResourceManager based on the 
above step
3. Change the input parameters of registerJobMaster method in 
ResourceManager and ResourceManagerGateway class to be consistent with 
registerTaskExecutor, from jobMasterRegistration to resourceManagerLeaderId + 
jobMasterAddress  + jobID
4. Change the result type of registerJobMaster method in ResourceManager 
and ResourceManagerGateway class to be consistent with RetryingRegistration, 
from org.apache.flink.runtime.resourcemanager.RegistrationResponse to 
org.apache.flink.runtime.registration.RegistrationResponse
5. Add a LeaderRetrievalListener in ResourceManager to listen to leadership 
of jobMaster
6. Add a test class for registerJobMaster method in ResourceManager

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4537

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2479


commit fa66ac8ae86745dc9daf1fb07c6c96be4f336c90
Author: beyond1920 
Date:   2016-09-01T07:27:20Z

rsourceManager registration with JobManager

commit f5e54a21e4a864b5ac5f2f548b6d3dea3edcb619
Author: beyond1920 
Date:   2016-09-07T09:53:44Z

Add JobMasterLeaderRetriverListener at ResourceManager




> ResourceManager registration with JobManager
> 
>
> Key: FLINK-4537
> URL: https://issues.apache.org/jira/browse/FLINK-4537
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: zhangjing
>
> The ResourceManager keeps tracks of all JobManager's which execute Jobs. When 
> a new JobManager registered, its leadership status is checked through the 
> HighAvailabilityServices. It will then be registered at the ResourceManager 
> using the {{JobID}} provided with the initial registration message.
> ResourceManager should use JobID and LeaderSessionID(notified by 
> HighAvailabilityServices) to identify a a session to JobMaster.
> When JobManager's register at ResourceManager, it takes the following 2 input 
> parameters :
> 1. resourceManagerLeaderId:  the fencing token for the ResourceManager leader 
> which is kept by JobMaster who send the registration
> 2. JobMasterRegistration: contain address, JobID
> ResourceManager need to process the registration event based on the following 
> steps:
> 1. Check whether input resourceManagerLeaderId is as same as the current 
> leadershipSessionId of resourceManager. If not, it means that maybe two or 
> more resourceManager exists at the same time, and current resourceManager is 
> not the proper rm. so it  rejects or ignores the registration.
> 2. Check whether exists a valid JobMaster at the giving address by connecting 
> to the address. Reject the registration from invalid address.(Hidden in the 
> connect logic)
> 3. Keep JobID and JobMasterGateway mapping relationships.
> 4. Start a JobMasterLeaderListener at the given JobID to listen to the 
> leadership of the specified JobMaster.
> 5. Send registration successful ack to the jobMaster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2479: [FLINK-4537] [cluster management] ResourceManager ...

2016-09-07 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2479

[FLINK-4537] [cluster management] ResourceManager registration with 
JobManager

This pull request is to implement ResourceManager registration with 
JobManager, which including:
1. Check whether input resourceManagerLeaderId is as same as the current 
leadershipSessionId of resourceManager. If not, it means that maybe two or more 
resourceManager exists at the same time, and current resourceManager is not the 
proper rm. so it rejects or ignores the registration.
2. Check whether exists a valid JobMaster at the giving address by 
connecting to the address. Reject the registration from invalid address.(Hidden 
in the connect logic)
3. Keep JobID and JobMasterGateway mapping relationships.
4. Start a JobMasterLeaderListener at the given JobID to listen to the 
leadership of the specified JobMaster.
5. Send registration successful ack to the jobMaster.

Main difference are 6 points:
1. Add getJobMasterLeaderRetriever method to get job master leader 
retriever in HighAvailabilityServices, NonHaServices, A inner class in 
TaskExecutor, TestingHighAvailabilityServices.
2. Change registerJobMaster method logic of ResourceManager based on the 
above step
3. Change the input parameters of registerJobMaster method in 
ResourceManager and ResourceManagerGateway class to be consistent with 
registerTaskExecutor, from jobMasterRegistration to resourceManagerLeaderId + 
jobMasterAddress  + jobID
4. Change the result type of registerJobMaster method in ResourceManager 
and ResourceManagerGateway class to be consistent with RetryingRegistration, 
from org.apache.flink.runtime.resourcemanager.RegistrationResponse to 
org.apache.flink.runtime.registration.RegistrationResponse
5. Add a LeaderRetrievalListener in ResourceManager to listen to leadership 
of jobMaster
6. Add a test class for registerJobMaster method in ResourceManager

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4537

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2479


commit fa66ac8ae86745dc9daf1fb07c6c96be4f336c90
Author: beyond1920 
Date:   2016-09-01T07:27:20Z

rsourceManager registration with JobManager

commit f5e54a21e4a864b5ac5f2f548b6d3dea3edcb619
Author: beyond1920 
Date:   2016-09-07T09:53:44Z

Add JobMasterLeaderRetriverListener at ResourceManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4551) Heartbeat Manager integration with JobMaster

2016-09-07 Thread zhangjing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangjing reassigned FLINK-4551:


Assignee: zhangjing

> Heartbeat Manager integration with JobMaster
> 
>
> Key: FLINK-4551
> URL: https://issues.apache.org/jira/browse/FLINK-4551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4427) Implement container releasing logic (Standalone / Yarn / Mesos)

2016-09-07 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-4427:
--
Summary: Implement container releasing logic (Standalone / Yarn / Mesos)  
(was: Add slot / Implement container releasing logic (Standalone / Yarn / 
Mesos))

> Implement container releasing logic (Standalone / Yarn / Mesos)
> ---
>
> Key: FLINK-4427
> URL: https://issues.apache.org/jira/browse/FLINK-4427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>
> Currently we only have allocation logic for SlotManager / ResourceManager, 
> for some batch job, slots that already finished can be released, thus should 
> trigger container release in different cluster modes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1526) Add Minimum Spanning Tree library method and example

2016-09-07 Thread Olga Golovneva (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472586#comment-15472586
 ] 

Olga Golovneva commented on FLINK-1526:
---

Hi,
I was wondering what is the current state for this JIRA? I am really interested 
in implementing Boruvka's algorithm, I just wanted to check out if the for-loop 
iteration issue has been fixed by now.


> Add Minimum Spanning Tree library method and example
> 
>
> Key: FLINK-1526
> URL: https://issues.apache.org/jira/browse/FLINK-1526
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Vasia Kalavri
>
> This issue proposes the addition of a library method and an example for 
> distributed minimum spanning tree in Gelly.
> The DMST algorithm is very interesting because it is quite different from 
> PageRank-like iterative graph algorithms. It consists of distinct phases 
> inside the same iteration and requires a mechanism to detect convergence of 
> one phase to proceed to the next one. Current implementations in 
> vertex-centric models are quite long (>1000 lines) and hard to understand.
> You can find a description of the algorithm [here | 
> http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf] and [here | 
> http://www.vldb.org/pvldb/vol7/p1047-han.pdf].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-1707) Add an Affinity Propagation Library Method

2016-09-07 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425389#comment-15425389
 ] 

Josep Rubió edited comment on FLINK-1707 at 9/8/16 3:49 AM:


Hi [~vkalavri],

I changed the implementation, now: 

* The convergence condition changes to local modifications to the vertices that 
decide to be exemplars instead of no modifications in messages. This should 
remain the same for certain number of steps that are defined in the constructor
* The values hash map is no longer needed
* If a damping factor different to 0 is provided to the AP constructor, the 
oldValues HashMap will be populated and messages will be damped
* If a damping factor of 0 is provided to the constructor the oldValues HashMap 
will not be created

I will update the document

Thanks!


was (Author: joseprupi):
Hi [~vkalavri],

I changed the implementation now having these members for E vertices:

private HashMap weights;
private HashMap oldValues;
private long exemplar;
private int convergenceFactor;
private int convergenceFactorCounter;

where before it was:

private HashMap weights;
private HashMap values;
private HashMap oldValues;
private long exemplar;

So:

* The values hash map is no longer needed
* If a damping factor different to 0 is provided to the AP constructor, the 
oldValues HashMap will be populated and the algorithm will work the same way it 
was before. 
* If a damping factor of 0 is provided to the constructor:
-- The convergence condition changes to local modifications to the 
vertices that decide to be exemplars instead of no modifications in messages. 
This should remain the same for certain number of steps that are defined in the 
constructor
-- There is no damping factor applied to messages
-- The oldValues HashMap is not used

Now I need to change the initialization of the graph, I will post the question 
to the dev group and see if someone can help me on doing it the best way. Once 
I finish it I will clean and comment the code, update the document and submit a 
new pull request.

Thanks!

> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4591) Select star does not work with grouping

2016-09-07 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472401#comment-15472401
 ] 

Jark Wu commented on FLINK-4591:


It seems that {{GROUP BY *}} is not allowed in SQL.  Maybe we can throw a 
better exception to explain this when user use star in groupBy. Currently, the 
exception is "cannot resolve [*] given input [a, b, c]" which is not clear.

> Select star does not work with grouping
> ---
>
> Key: FLINK-4591
> URL: https://issues.apache.org/jira/browse/FLINK-4591
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> It would be consistent if this would also work:
> {{table.groupBy('*).select("*)}}
> Currently, the star only works in a plain select without grouping.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77943543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

There is a log field in RpcEndpoint, which is protected, why not use that 
instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472686#comment-15472686
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77943543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

There is a log field in RpcEndpoint, which is protected, why not use that 
instead?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4408) Submit Job and setup ExecutionGraph

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472703#comment-15472703
 ] 

ASF GitHub Bot commented on FLINK-4408:
---

GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2480

[FLINK-4408][JobManager] Introduce JobMasterRunner and implement job 
submission & setting up the ExecutionGraph

Introduce JobMasterRunner to deal with job level leader election and make 
underlying job manager properly reacted. Also this runner takes care of 
determining whether job should be submitted with recover fashion. 

This PR also implement the job submission skeleton and setting up the 
ExecutionGraph, but the interactions with client has been marked as TODO since 
it may rely on something like JobClientGateway. I'd like to take care of that 
in a seperate PR later.

The main procedure of managing the lifecycle of a job is:

* Once we received a job submission request from the user, we create a 
JobMasterRunner to deal with it. 
* JobMasterRunner will first create leader election service to contend the 
leader of this job, once leadership is granted, it will try to do some real 
submission work. 
* Any error occurred during the submission phase will make this job as 
rejected and dropped.
* Once job is accepted, we will face two levels on job retry:
  1. Retarting upon execution failure, it's been taken care of 
RestartStrategy, and it's mainly __inside__ the ExecutionGraph
  2. Loose of leadership, it will be handled by JobMasterRunner, the old 
ExecutionGraph will be suspended and then disposed. Retrying should re-submit 
the job to the JobMaster with isRecovery marked as true. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-4408

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2480


commit c5afeb1718f14b47739039b7f4695a791e2f1d20
Author: Kurt Young 
Date:   2016-09-08T04:00:13Z

[FLINK-4408][JobManager] Introduce JobMasterRunner and implement job 
submission & setting up the ExecutionGraph




> Submit Job and setup ExecutionGraph
> ---
>
> Key: FLINK-4408
> URL: https://issues.apache.org/jira/browse/FLINK-4408
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Xiaogang Shi
>Assignee: Kurt Young
>
> Once granted the leadership, JM will start to execute the job.
> Most code remains the same except that 
> (1) In old implementation where JM manages the execution of multiple jobs, JM 
> has to load all submitted JobGraphs from SubmittedJobGraphStore and recover 
> them. Now that the components creating JM will be responsible for the 
> recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, 
> without the need to load the JobGraph.
> (2) JM should not rely on Akka to listen on the updates of JobStatus and 
> Execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2480: [FLINK-4408][JobManager] Introduce JobMasterRunner...

2016-09-07 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2480

[FLINK-4408][JobManager] Introduce JobMasterRunner and implement job 
submission & setting up the ExecutionGraph

Introduce JobMasterRunner to deal with job level leader election and make 
underlying job manager properly reacted. Also this runner takes care of 
determining whether job should be submitted with recover fashion. 

This PR also implement the job submission skeleton and setting up the 
ExecutionGraph, but the interactions with client has been marked as TODO since 
it may rely on something like JobClientGateway. I'd like to take care of that 
in a seperate PR later.

The main procedure of managing the lifecycle of a job is:

* Once we received a job submission request from the user, we create a 
JobMasterRunner to deal with it. 
* JobMasterRunner will first create leader election service to contend the 
leader of this job, once leadership is granted, it will try to do some real 
submission work. 
* Any error occurred during the submission phase will make this job as 
rejected and dropped.
* Once job is accepted, we will face two levels on job retry:
  1. Retarting upon execution failure, it's been taken care of 
RestartStrategy, and it's mainly __inside__ the ExecutionGraph
  2. Loose of leadership, it will be handled by JobMasterRunner, the old 
ExecutionGraph will be suspended and then disposed. Retrying should re-submit 
the job to the JobMaster with isRecovery marked as true. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-4408

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2480


commit c5afeb1718f14b47739039b7f4695a791e2f1d20
Author: Kurt Young 
Date:   2016-09-08T04:00:13Z

[FLINK-4408][JobManager] Introduce JobMasterRunner and implement job 
submission & setting up the ExecutionGraph




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471057#comment-15471057
 ] 

ASF GitHub Bot commented on FLINK-3755:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2440#discussion_r77856665
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+   public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+   private KeyGroupRangeAssignment() {
+   throw new AssertionError();
+   }
+
+   /**
+* Assigns the given key to a parallel operator index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @param parallelism the current parallelism of the operator
+* @return the index of the parallel operator to which the given key 
should be routed.
+*/
+   public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
+   return computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, assignToKeyGroup(key, maxParallelism));
+   }
+
+   /**
+* Assigns the given key to a key-group index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @return the key-group to which the given key is assigned
+*/
+   public static final int assignToKeyGroup(Object key, int 
maxParallelism) {
+   return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+   }
+
+   /**
+* Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
+* parallelism.
+*
+* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+* to go beyond this boundary, this method must perform arithmetic on 
long values.
+*
+* @param maxParallelism Maximal parallelism that the job was initially 
created with.
+* @param parallelismThe current parallelism under which the job 
runs. Must be <= maxParallelism.
+* @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+* @return
+*/
+   public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+   int maxParallelism,
+   int parallelism,
+   int operatorIndex) {
+   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
+   Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
+   Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+   int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+   int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
--- End diff --

Can't we simplify this expression to
```
int start = (operatorIndex * maxParallelism) / parallelism;
int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1;
```


> Introduce key groups for key-value state to support dynamic scaling
> ---
>
> Key: FLINK-3755
> URL: https://issues.apache.org/jira/browse/FLINK-3755
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.1.0
>Reporter: Till 

[GitHub] flink issue #2440: [FLINK-3755] Introduce key groups for key-value state to ...

2016-09-07 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2440
  
I don't think that this is giving us the correct inverse for 
```computeOperatorIndexForKeyGroup(...)```. Our test 
```CheckpointCoordinatorTest::testCreateKeyGroupPartitions()```generates 
counter-examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4572) Convert to negative in LongValueToIntValue

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471566#comment-15471566
 ] 

ASF GitHub Bot commented on FLINK-4572:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2469
  
Do you have a second use case in mind for adding this function to 
`MathUtils`? My thought would be to keep this separate to avoid confusion 
between signed and unsigned downcasts.


> Convert to negative in LongValueToIntValue
> --
>
> Key: FLINK-4572
> URL: https://issues.apache.org/jira/browse/FLINK-4572
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The Gelly drivers expect that scale 32 edges, represented by the lower 32 
> bits of {{long}} values, can be converted to {{int}} values. Values between 
> 2^31 and 2^32 - 1 should be converted to negative integers, which is not 
> supported by {{MathUtils.checkedDownCast}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2469: [FLINK-4572] [gelly] Convert to negative in LongValueToIn...

2016-09-07 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2469
  
Do you have a second use case in mind for adding this function to 
`MathUtils`? My thought would be to keep this separate to avoid confusion 
between signed and unsigned downcasts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4594) Validate lower bound in MathUtils.checkedDownCast

2016-09-07 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4594:
-

 Summary: Validate lower bound in MathUtils.checkedDownCast
 Key: FLINK-4594
 URL: https://issues.apache.org/jira/browse/FLINK-4594
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Trivial


{{MathUtils.checkedDownCast}} only compares against the upper bound 
{{Integer.MAX_VALUE}}, which has worked with current usage. 

Rather than adding a second comparison we can replace

{noformat}
if (value > Integer.MAX_VALUE) {
{noformat}

with a cast and check

{noformat}
if ((int)value != value) { ...
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2469: [FLINK-4572] [gelly] Convert to negative in LongValueToIn...

2016-09-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2469
  
Looks good. Does it make sense to add to extend the 
`MathUtils.checkedDownCast(...)` function, or add a 
`MathUtils.checkedSignedDownCast(...)` function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4572) Convert to negative in LongValueToIntValue

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471484#comment-15471484
 ] 

ASF GitHub Bot commented on FLINK-4572:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2469
  
Looks good. Does it make sense to add to extend the 
`MathUtils.checkedDownCast(...)` function, or add a 
`MathUtils.checkedSignedDownCast(...)` function?


> Convert to negative in LongValueToIntValue
> --
>
> Key: FLINK-4572
> URL: https://issues.apache.org/jira/browse/FLINK-4572
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The Gelly drivers expect that scale 32 edges, represented by the lower 32 
> bits of {{long}} values, can be converted to {{int}} values. Values between 
> 2^31 and 2^32 - 1 should be converted to negative integers, which is not 
> supported by {{MathUtils.checkedDownCast}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4593) Fix PageRank algorithm example

2016-09-07 Thread Alexander Pivovarov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pivovarov updated FLINK-4593:
---
Flags: Patch

> Fix PageRank algorithm example
> --
>
> Key: FLINK-4593
> URL: https://issues.apache.org/jira/browse/FLINK-4593
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Alexander Pivovarov
>Priority: Minor
>
> This page https://flink.apache.org/features.html shows the code which 
> implements PageRank algorithm (Batch Processing Applications).
> I noticed couple bugs in the code
> Page class has pageId field
> Adjacency has just id
> but in the code I see
> {code}pages.join(adjacency).where("pageId").equalTo("pageId"){code}
> {code}Page(page.id, 0.15 / numPages){code}
> Also, the code in not formatted (missing spaces)
> {code}Page(n, 0.85*page.rank/adj.neighbors.length){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

2016-09-07 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2440#discussion_r77870173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+   public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+   private KeyGroupRangeAssignment() {
+   throw new AssertionError();
+   }
+
+   /**
+* Assigns the given key to a parallel operator index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @param parallelism the current parallelism of the operator
+* @return the index of the parallel operator to which the given key 
should be routed.
+*/
+   public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
+   return computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, assignToKeyGroup(key, maxParallelism));
+   }
+
+   /**
+* Assigns the given key to a key-group index.
+*
+* @param key the key to assign
+* @param maxParallelism the maximum supported parallelism, aka the 
number of key-groups.
+* @return the key-group to which the given key is assigned
+*/
+   public static final int assignToKeyGroup(Object key, int 
maxParallelism) {
+   return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+   }
+
+   /**
+* Computes the range of key-groups that are assigned to a given 
operator under the given parallelism and maximum
+* parallelism.
+*
+* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid 
rounding problems in this method. If we ever want
+* to go beyond this boundary, this method must perform arithmetic on 
long values.
+*
+* @param maxParallelism Maximal parallelism that the job was initially 
created with.
+* @param parallelismThe current parallelism under which the job 
runs. Must be <= maxParallelism.
+* @param operatorIndex  Id of a key-group. 0 <= keyGroupID < 
maxParallelism.
+* @return
+*/
+   public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+   int maxParallelism,
+   int parallelism,
+   int operatorIndex) {
+   Preconditions.checkArgument(parallelism > 0, "Parallelism must 
not be smaller than zero.");
+   Preconditions.checkArgument(maxParallelism >= parallelism, 
"Maximum parallelism must not be smaller than parallelism.");
+   Preconditions.checkArgument(maxParallelism <= (1 << 15), 
"Maximum parallelism must be smaller than 2^15.");
+
+   int start = operatorIndex == 0 ? 0 : ((operatorIndex * 
maxParallelism - 1) / parallelism) + 1;
+   int end = ((operatorIndex + 1) * maxParallelism - 1) / 
parallelism;
--- End diff --

I don't think that this is giving us the correct inverse for 
computeOperatorIndexForKeyGroup(...). Our test 
CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates 
counter-examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471283#comment-15471283
 ] 

ASF GitHub Bot commented on FLINK-3755:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2440
  
I don't think that this is giving us the correct inverse for 
```computeOperatorIndexForKeyGroup(...)```. Our test 
```CheckpointCoordinatorTest::testCreateKeyGroupPartitions()```generates 
counter-examples.


> Introduce key groups for key-value state to support dynamic scaling
> ---
>
> Key: FLINK-3755
> URL: https://issues.apache.org/jira/browse/FLINK-3755
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> In order to support dynamic scaling, it is necessary to sub-partition the 
> key-value states of each operator. This sub-partitioning, which produces a 
> set of key groups, allows to easily scale in and out Flink jobs by simply 
> reassigning the different key groups to the new set of sub tasks. The idea of 
> key groups is described in this design document [1]. 
> [1] 
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4585) Fix broken links in index.md

2016-09-07 Thread Alexander Pivovarov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pivovarov updated FLINK-4585:
---
Flags: Patch

> Fix broken links in index.md
> 
>
> Key: FLINK-4585
> URL: https://issues.apache.org/jira/browse/FLINK-4585
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Alexander Pivovarov
>Priority: Minor
>
> The following links are broken
> DataSet API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html
> Table API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html
> Gelly
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/index.html
> The following links show "Page 'X' Has Moved to" for 1-2 sec  and then 
> redirect to another page
> DataStream API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html
> programming guide
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html
> redirects-to DataSet API: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html
> probably it should be "Basic API Concepts" 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html
> or Quick Start - 
> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html
> CEP
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html
> ML
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/ml/index.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/ml/index.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4593) Fix PageRank algorithm example

2016-09-07 Thread Alexander Pivovarov (JIRA)
Alexander Pivovarov created FLINK-4593:
--

 Summary: Fix PageRank algorithm example
 Key: FLINK-4593
 URL: https://issues.apache.org/jira/browse/FLINK-4593
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Alexander Pivovarov
Priority: Minor


This page https://flink.apache.org/features.html shows the code which 
implements PageRank algorithm (Batch Processing Applications).

I noticed couple bugs in the code
Page class has pageId field
Adjacency has just id

but in the code I see
{code}pages.join(adjacency).where("pageId").equalTo("pageId"){code}

Also {code}Page(page.id, 0.15 / numPages){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4593) Fix PageRank algorithm example

2016-09-07 Thread Alexander Pivovarov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pivovarov updated FLINK-4593:
---
Description: 
This page https://flink.apache.org/features.html shows the code which 
implements PageRank algorithm (Batch Processing Applications).

I noticed couple bugs in the code
Page class has pageId field
Adjacency has just id

but in the code I see
{code}pages.join(adjacency).where("pageId").equalTo("pageId"){code}

{code}Page(page.id, 0.15 / numPages){code}

Also, the code in not formatted (missing spaces)
{code}Page(n, 0.85*page.rank/adj.neighbors.length){code}

  was:
This page https://flink.apache.org/features.html shows the code which 
implements PageRank algorithm (Batch Processing Applications).

I noticed couple bugs in the code
Page class has pageId field
Adjacency has just id

but in the code I see
{code}pages.join(adjacency).where("pageId").equalTo("pageId"){code}

Also {code}Page(page.id, 0.15 / numPages){code}


> Fix PageRank algorithm example
> --
>
> Key: FLINK-4593
> URL: https://issues.apache.org/jira/browse/FLINK-4593
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Alexander Pivovarov
>Priority: Minor
>
> This page https://flink.apache.org/features.html shows the code which 
> implements PageRank algorithm (Batch Processing Applications).
> I noticed couple bugs in the code
> Page class has pageId field
> Adjacency has just id
> but in the code I see
> {code}pages.join(adjacency).where("pageId").equalTo("pageId"){code}
> {code}Page(page.id, 0.15 / numPages){code}
> Also, the code in not formatted (missing spaces)
> {code}Page(n, 0.85*page.rank/adj.neighbors.length){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77768256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

ResourceManager keeps a relationship between resourceID and 
TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID 
using ResourceManager here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469776#comment-15469776
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77768256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

ResourceManager keeps a relationship between resourceID and 
TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID 
using ResourceManager here?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469797#comment-15469797
 ] 

Simone Robutti commented on FLINK-4565:
---

Actually the confusing part is that the translation to the execution plan 
should already exist but I can't find where it happens. 



> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77769044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

There exists 3 following possibilities of the response from taskExecutor:
1. Ack request which means the taskExecutor gives the slot to the specified 
jobMaster as expected. 
2. Decline request if the slot is already occupied by other AllocationID. 
3. Timeout which could caused by lost of request message or response 
message or slow network transfer. 
On the first occasion, SlotManager need to do nothing. However, under the 
second and third occasion, slotManager will verify and clear all the previous 
allocate information for this slot request firstly, then try to find a proper 
slot for the slot request again. I thought we should add logic to handle these 
3 following possibilities of the response from taskExecutor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469801#comment-15469801
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77769044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

There exists 3 following possibilities of the response from taskExecutor:
1. Ack request which means the taskExecutor gives the slot to the specified 
jobMaster as expected. 
2. Decline request if the slot is already occupied by other AllocationID. 
3. Timeout which could caused by lost of request message or response 
message or slow network transfer. 
On the first occasion, SlotManager need to do nothing. However, under the 
second and third occasion, slotManager will verify and clear all the previous 
allocate information for this slot request firstly, then try to find a proper 
slot for the slot request again. I thought we should add logic to handle these 
3 following possibilities of the response from taskExecutor.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469836#comment-15469836
 ] 

Fabian Hueske commented on FLINK-4565:
--

SQL {{IN}} does not require a special implementation because it can be 
translated into an inner join (as long as the subquery is not correlated with 
the outer query). 
We use Calcite to convert the subquery into a join 
({{RelDecorrelator.decorrelateQuery(relNode)}}) in 
{{org.apache.flink.api.table.BatchTableEnvironment}} line 246.
After the conversion, IN is represented and executed as join.

I think the integration with the Table API can happen in two ways:

1. Generate a RelNode plan with a subquery and let Calcite do the decorrelation 
before the query is optimized (same approach as SQL)
2. Generate a RelNode plan with a Join from the beginning.

I would try to go for the second approach first.


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Simone Robutti (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Simone Robutti reassigned FLINK-4565:
-

Assignee: Simone Robutti

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469851#comment-15469851
 ] 

Simone Robutti commented on FLINK-4565:
---

I imagined it could happen that way and I was going to open calcite source to  
look for that. You saved me a lot of time. I will go for the second approach, 
considering that it looks a lot like what happens for all the others operators 
I've seen.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469873#comment-15469873
 ] 

Timo Walther commented on FLINK-4565:
-

Calcite translates the IN operator in 
{{org.apache.calcite.sql2rel.SqlToRelConverter#convertExpression}}. Calcite 
translates this into an Aggregate and Join. After fixing some issue in 
"DataSetAggregate" we can execute: {{"SELECT WordCount.word FROM WordCount 
WHERE WordCount.word IN (SELECT WordCount1.word AS w FROM WordCount1)"}}. The 
plan looks like:

{code}
== Physical Execution Plan ==
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED

Stage 3 : Map
content : from: (word, frequency)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 8 : Map
content : from: (word, frequency)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 7 : Map
content : prepare select: (word)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED

Stage 6 : GroupCombine
content : groupBy: (word), select:(word)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED

Stage 5 : GroupReduce
content : groupBy: (word), 
select:(word)
ship_strategy : Hash Partition 
on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group 
Reduce
Partitioning : 
RANDOM_PARTITIONED

Stage 2 : Join
content : where: 
(=(word, w)), join: (word, frequency, w)
ship_strategy : Hash 
Partition on [0]
exchange_mode : 
PIPELINED
driver_strategy : 
Hybrid Hash (build: from: (word, frequency) (id: 3))
Partitioning : 
RANDOM_PARTITIONED

Stage 1 : FlatMap
content : 
select: (word)
ship_strategy : 
Forward
exchange_mode : 
PIPELINED
driver_strategy 
: FlatMap
Partitioning : 
RANDOM_PARTITIONED

Stage 0 : Data 
Sink
content 
: org.apache.flink.api.java.io.DiscardingOutputFormat

ship_strategy : Forward

exchange_mode : PIPELINED

Partitioning : RANDOM_PARTITIONED
{code}

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-07 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469879#comment-15469879
 ] 

Timo Walther commented on FLINK-4565:
-

It seems my answer was to late. Yes, I would also go for the second approach.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)