[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262007#comment-16262007
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5032#discussion_r152475532
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 ---
@@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() 
throws Exception {
assertFalse(value.iterator().hasNext());
}
 
+   @Test(expected = DuplicateStateNameException.class)
+   public void testDuplicateStateName() throws Exception {
+   StreamingRuntimeContext context = new StreamingRuntimeContext(
+   createMapPlainMockOp(),
+   createMockEnvironment(),
+   Collections.emptyMap());
+   MapStateDescriptor mapStateDesc =
+   new MapStateDescriptor<>("name", Integer.class, 
String.class);
+   ListStateDescriptor listStateDesc =
+   new ListStateDescriptor<>("name", String.class);
+   context.getMapState(mapStateDesc);
--- End diff --

This is good enough  


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at 

[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...

2017-11-21 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5032#discussion_r152475532
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 ---
@@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() 
throws Exception {
assertFalse(value.iterator().hasNext());
}
 
+   @Test(expected = DuplicateStateNameException.class)
+   public void testDuplicateStateName() throws Exception {
+   StreamingRuntimeContext context = new StreamingRuntimeContext(
+   createMapPlainMockOp(),
+   createMockEnvironment(),
+   Collections.emptyMap());
+   MapStateDescriptor mapStateDesc =
+   new MapStateDescriptor<>("name", Integer.class, 
String.class);
+   ListStateDescriptor listStateDesc =
+   new ListStateDescriptor<>("name", String.class);
+   context.getMapState(mapStateDesc);
--- End diff --

This is good enough 👍 


---


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-21 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261951#comment-16261951
 ] 

Xingcan Cui commented on FLINK-8118:


Hi [~twalthr], thanks for raising this. I got two questions about this issue.
# Shall we integrate the methods into the {{KafkaTableSource.Builder}} or 
directly into the {{KafkaTableSource}}? Personally, I prefer the later one 
since it will be more flexible, while that seems to break the design pattern to 
some extent.
# Since the {{startupMode}} in {{FlinkKafkaConsumerBase}} is invisible from 
outer classes, do you have some suggestions on testing this?

Thanks, Xingcan

> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected

2017-11-21 Thread lincoln.lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261907#comment-16261907
 ] 

lincoln.lee commented on FLINK-6101:


[~twalthr] This implementation support select original expressions and UDF in 
groupBy clause(make consistent with SQL). It seems a bit odd to support alias 
in groupBy clause.

> GroupBy fields with arithmetic expression (include UDF) can not be selected
> ---
>
> Key: FLINK-6101
> URL: https://issues.apache.org/jira/browse/FLINK-6101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> currently the TableAPI do not support selecting GroupBy fields with 
> expression either using original field name or the expression 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> caused
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> (BTW, this syntax is invalid in RDBMS which will indicate the selected column 
> is invalid in the select list because it is not contained in either an 
> aggregate function or the GROUP BY clause in SQL Server.)
> and 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> will also cause
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. 
> and apply an UDF doesn’t work either
> {code}
>table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, 
> 'd.count, 'e.avg)
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, 
> TMP_1, TMP_2].
> {code}
> the only way to get this work can be 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .select('a, 'b%3 as 'b, 'c, 'd, 'e)
> .groupBy('e, 'b)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> One way to solve this is to add support alias in groupBy clause ( it seems a 
> bit odd against SQL though TableAPI has a different groupBy grammar),  
> and I prefer to support select original expressions and UDF in groupBy 
> clause(make consistent with SQL).
> as thus:
> {code}
> // use expression
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count)
> // use UDF
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, Mod('b,3))
> .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count)
> {code}

> After had a look into the code, found there was a problem in the groupBy 
> implementation, validation hadn't considered the expressions in groupBy 
> clause. it should be noted that a table has been actually changed after 
> groupBy operation ( a new Table) and the groupBy keys replace the original 
> field reference in essence.
>  
> What do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261904#comment-16261904
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152459473
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -134,9 +138,9 @@ void assignExclusiveSegments(List 
segments) {
 
--- End diff --

agree


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-21 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152459473
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -134,9 +138,9 @@ void assignExclusiveSegments(List 
segments) {
 
--- End diff --

agree


---


[jira] [Commented] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint

2017-11-21 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261898#comment-16261898
 ] 

Hai Zhou UTC+8 commented on FLINK-7716:
---

[~gjy] Sorry, I have not had any free time lately to work on this ticket.  this 
ticket and FLINK-7717  are freely assigned. you can assign yourself if you are 
interested in them.

> Port JobManagerMetricsHandler to new REST endpoint
> --
>
> Key: FLINK-7716
> URL: https://issues.apache.org/jira/browse/FLINK-7716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hai Zhou UTC+8
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JobManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261892#comment-16261892
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152457595
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -240,15 +245,13 @@ void releaseAllResources() throws IOException {
}
}
}
-
-   synchronized (availableBuffers) {
+   synchronized (bufferQueue) {
--- End diff --

ok


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-21 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152457595
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -240,15 +245,13 @@ void releaseAllResources() throws IOException {
}
}
}
-
-   synchronized (availableBuffers) {
+   synchronized (bufferQueue) {
--- End diff --

ok


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261886#comment-16261886
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152456861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode 
unless
+* the channel has been released.
+*
+* @return The available buffer.
+*/
+   @Nullable
+   public Buffer requestBuffer() {
+   synchronized (bufferQueue) {
+   // Take the floating buffer first if possible.
+   if (bufferQueue.getFloatingBufferSize() > 0) {
--- End diff --

that is fine


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-21 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152456861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode 
unless
+* the channel has been released.
+*
+* @return The available buffer.
+*/
+   @Nullable
+   public Buffer requestBuffer() {
+   synchronized (bufferQueue) {
+   // Take the floating buffer first if possible.
+   if (bufferQueue.getFloatingBufferSize() > 0) {
--- End diff --

that is fine


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261871#comment-16261871
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152454790
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode 
unless
+* the channel has been released.
+*
+* @return The available buffer.
+*/
+   @Nullable
+   public Buffer requestBuffer() {
+   synchronized (bufferQueue) {
+   // Take the floating buffer first if possible.
+   if (bufferQueue.getFloatingBufferSize() > 0) {
+   return bufferQueue.takeFloatingBuffer();
+   } else {
+   return bufferQueue.takeExclusiveBuffer();
+   }
+   }
+   }
+
+   /**
+* Receives the backlog from the producer's buffer response. If the 
number of available
+* buffers is less than backlog + initialCredit, it will request 
floating buffers from the buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   void onSenderBacklog(int backlog) throws IOException {
--- End diff --

yes


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-21 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152454790
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -394,7 +419,63 @@ public BufferProvider getBufferProvider() throws 
IOException {
return inputGate.getBufferProvider();
}
 
-   public void onBuffer(Buffer buffer, int sequenceNumber) {
+   /**
+* Requests buffer from input channel directly for receiving network 
data.
+* It should always return an available buffer in credit-based mode 
unless
+* the channel has been released.
+*
+* @return The available buffer.
+*/
+   @Nullable
+   public Buffer requestBuffer() {
+   synchronized (bufferQueue) {
+   // Take the floating buffer first if possible.
+   if (bufferQueue.getFloatingBufferSize() > 0) {
+   return bufferQueue.takeFloatingBuffer();
+   } else {
+   return bufferQueue.takeExclusiveBuffer();
+   }
+   }
+   }
+
+   /**
+* Receives the backlog from the producer's buffer response. If the 
number of available
+* buffers is less than backlog + initialCredit, it will request 
floating buffers from the buffer
+* pool, and then notify unannounced credits to the producer.
+*
+* @param backlog The number of unsent buffers in the producer's sub 
partition.
+*/
+   void onSenderBacklog(int backlog) throws IOException {
--- End diff --

yes


---


[GitHub] flink pull request #5047: Code refine of WordWithCount

2017-11-21 Thread harborl
GitHub user harborl opened a pull request:

https://github.com/apache/flink/pull/5047

Code refine of WordWithCount

Only because of the thread-safe and coding convention consideration, please 
check if it's OK with adding `final` decoration?


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull 

[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261869#comment-16261869
 ] 

ASF GitHub Bot commented on FLINK-6101:
---

GitHub user lincoln-lil opened a pull request:

https://github.com/apache/flink/pull/5046

[FLINK-6101] [table] Support select GroupBy fields with arithmetic ex…

## What is the purpose of the change

Support select GroupBy fields with arithmetic expressions(include UDF)

## Brief change log
- Using an internal alias for groupBy fields with arithmetic 
expressions(include UDF) in groupBy().

## Verifying this change
`AggregateITCase` verifies
- select GroupBy fields with arithmetic expression/UDF
  
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lincoln-lil/flink FLINK-6101

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5046.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5046


commit d529b9e6e9ed0977fbd2f27f27e1af0521b9231d
Author: lincoln-lil 
Date:   2017-11-22T01:53:54Z

[FLINK-6101] [table] Support select GroupBy fields with arithmetic 
expressions(include UDF)




> GroupBy fields with arithmetic expression (include UDF) can not be selected
> ---
>
> Key: FLINK-6101
> URL: https://issues.apache.org/jira/browse/FLINK-6101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> currently the TableAPI do not support selecting GroupBy fields with 
> expression either using original field name or the expression 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> caused
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> (BTW, this syntax is invalid in RDBMS which will indicate the selected column 
> is invalid in the select list because it is not contained in either an 
> aggregate function or the GROUP BY clause in SQL Server.)
> and 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> will also cause
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. 
> and apply an UDF doesn’t work either
> {code}
>table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, 
> 'd.count, 'e.avg)
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, 
> TMP_1, TMP_2].
> {code}
> the only way to get this work can be 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .select('a, 'b%3 as 'b, 'c, 'd, 'e)
> .groupBy('e, 'b)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> One way to solve this is to add support alias in groupBy clause ( it seems a 
> bit odd against SQL though TableAPI has a different groupBy grammar),  
> and I prefer to support select original expressions and UDF in groupBy 
> clause(make consistent with SQL).
> as thus:
> {code}
> // use expression
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count)
> // use UDF
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, Mod('b,3))
> .select(Mod('b,3), 'c.min, 'e, 'a.avg, 

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-21 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152454439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -82,17 +84,19 @@
/** The initial number of exclusive buffers assigned to this channel. */
private int initialCredit;
 
-   /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
-   private final ArrayDeque availableBuffers = new ArrayDeque<>();
+   /** The available buffer queue wraps both exclusive and requested 
floating buffers. */
+   private final AvailableBufferQueue bufferQueue = new 
AvailableBufferQueue();
 
/** The number of available buffers that have not been announced to the 
producer yet. */
private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
/** The number of unsent buffers in the producer's sub partition. */
-   private final AtomicInteger senderBacklog = new AtomicInteger(0);
+   @GuardedBy("bufferQueue")
+   private int senderBacklog;
--- End diff --

agree with it


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261870#comment-16261870
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152454439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -82,17 +84,19 @@
/** The initial number of exclusive buffers assigned to this channel. */
private int initialCredit;
 
-   /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
-   private final ArrayDeque availableBuffers = new ArrayDeque<>();
+   /** The available buffer queue wraps both exclusive and requested 
floating buffers. */
+   private final AvailableBufferQueue bufferQueue = new 
AvailableBufferQueue();
 
/** The number of available buffers that have not been announced to the 
producer yet. */
private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
/** The number of unsent buffers in the producer's sub partition. */
-   private final AtomicInteger senderBacklog = new AtomicInteger(0);
+   @GuardedBy("bufferQueue")
+   private int senderBacklog;
--- End diff --

agree with it


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5046: [FLINK-6101] [table] Support select GroupBy fields...

2017-11-21 Thread lincoln-lil
GitHub user lincoln-lil opened a pull request:

https://github.com/apache/flink/pull/5046

[FLINK-6101] [table] Support select GroupBy fields with arithmetic ex…

## What is the purpose of the change

Support select GroupBy fields with arithmetic expressions(include UDF)

## Brief change log
- Using an internal alias for groupBy fields with arithmetic 
expressions(include UDF) in groupBy().

## Verifying this change
`AggregateITCase` verifies
- select GroupBy fields with arithmetic expression/UDF
  
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lincoln-lil/flink FLINK-6101

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5046.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5046


commit d529b9e6e9ed0977fbd2f27f27e1af0521b9231d
Author: lincoln-lil 
Date:   2017-11-22T01:53:54Z

[FLINK-6101] [table] Support select GroupBy fields with arithmetic 
expressions(include UDF)




---


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261868#comment-16261868
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5032#discussion_r152454242
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 ---
@@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() 
throws Exception {
assertFalse(value.iterator().hasNext());
}
 
+   @Test(expected = DuplicateStateNameException.class)
+   public void testDuplicateStateName() throws Exception {
+   StreamingRuntimeContext context = new StreamingRuntimeContext(
+   createMapPlainMockOp(),
+   createMockEnvironment(),
+   Collections.emptyMap());
+   MapStateDescriptor mapStateDesc =
+   new MapStateDescriptor<>("name", Integer.class, 
String.class);
+   ListStateDescriptor listStateDesc =
+   new ListStateDescriptor<>("name", String.class);
+   context.getMapState(mapStateDesc);
--- End diff --

Actually, the test is quite tricky here (internally, `getListState()` will 
fetch a `null` value instead of the `MapState` created before). It will not 
simulate the real runtime behavior, which erasures the return type for 
`DefaultKeyedStateStore.getPartitionedState()`, since the test mocks a 
`KeyedStateBackend`. The type will be checked in advance and that's why I need 
to catch-and-throw the `ClassCastException` in `getPartitionedState()`. 
However, I cannot find a better place for this test. Do you have some 
suggestions?


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  

[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...

2017-11-21 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5032#discussion_r152454242
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 ---
@@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() 
throws Exception {
assertFalse(value.iterator().hasNext());
}
 
+   @Test(expected = DuplicateStateNameException.class)
+   public void testDuplicateStateName() throws Exception {
+   StreamingRuntimeContext context = new StreamingRuntimeContext(
+   createMapPlainMockOp(),
+   createMockEnvironment(),
+   Collections.emptyMap());
+   MapStateDescriptor mapStateDesc =
+   new MapStateDescriptor<>("name", Integer.class, 
String.class);
+   ListStateDescriptor listStateDesc =
+   new ListStateDescriptor<>("name", String.class);
+   context.getMapState(mapStateDesc);
--- End diff --

Actually, the test is quite tricky here (internally, `getListState()` will 
fetch a `null` value instead of the `MapState` created before). It will not 
simulate the real runtime behavior, which erasures the return type for 
`DefaultKeyedStateStore.getPartitionedState()`, since the test mocks a 
`KeyedStateBackend`. The type will be checked in advance and that's why I need 
to catch-and-throw the `ClassCastException` in `getPartitionedState()`. 
However, I cannot find a better place for this test. Do you have some 
suggestions?


---


[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261822#comment-16261822
 ] 

ASF GitHub Bot commented on FLINK-4520:
---

Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2487
  
Keep this PR open until https://github.com/apache/bahir-flink/pull/22 
merged.


> Integrate Siddhi as a lightweight CEP Library
> -
>
> Key: FLINK-4520
> URL: https://issues.apache.org/jira/browse/FLINK-4520
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Hao Chen
>Assignee: Hao Chen
>  Labels: cep, library, patch-available
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
> * Filter
> * Join
> * Aggregation
> * Group by
> * Having
> * Window
> * Conditions and Expressions
> * Pattern processing
> * Sequence processing
> * Event Tables
> ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
> * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
> * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
> * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
> "from every s1 = inputStream1[id == 2] "
>  + " -> s2 = inputStream2[id == 3] "
>  + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>  + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...

2017-11-21 Thread haoch
Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2487
  
Keep this PR open until https://github.com/apache/bahir-flink/pull/22 
merged.


---


[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...

2017-11-21 Thread ChrisChinchilla
GitHub user ChrisChinchilla opened a pull request:

https://github.com/apache/flink/pull/5045

[hotfix][docs] Review of concepts docs for grammar and clarity

Spending some time doing a brief review of a few docs sections, just a 
start and I think more could be done.

I didn't change formatting or line breaks as much as I would like to as 
they're quite messy and inconsistent, but I'll save that for another time.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ChrisChinchilla/flink concepts-review

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5045.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5045


commit c592ad103cce9379bc7dc7ac6edef40769262523
Author: Chris Ward 
Date:   2017-11-22T01:10:08Z

Review programming model doc for clarity

commit 6310a879ba9d57ce4cfef5eeb81edc1978fb26a7
Author: Chris Ward 
Date:   2017-11-22T01:22:19Z

Review run time docs




---


[jira] [Created] (FLINK-8127) Add New Relic Metric Reporter

2017-11-21 Thread Ron Crocker (JIRA)
Ron Crocker created FLINK-8127:
--

 Summary: Add New Relic Metric Reporter
 Key: FLINK-8127
 URL: https://issues.apache.org/jira/browse/FLINK-8127
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Reporter: Ron Crocker


Add a MetricReporter that reports to New Relic.

This will likely look similar to the Datadog metric reporter - an opt-in 
library distributed with Flink that communicates directly with New Relic like 
one of its APM agents, configured appropriately to work with New Relic. 

I'll take this ticket myself



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5023: [hotfix][docs] Review of concepts docs for grammar, forma...

2017-11-21 Thread ChrisChinchilla
Github user ChrisChinchilla commented on the issue:

https://github.com/apache/flink/pull/5023
  
@greghogan Will start this PR afresh.


---


[GitHub] flink pull request #5023: [hotfix][docs] Review of concepts docs for grammar...

2017-11-21 Thread ChrisChinchilla
Github user ChrisChinchilla closed the pull request at:

https://github.com/apache/flink/pull/5023


---


[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...

2017-11-21 Thread ChrisChinchilla
Github user ChrisChinchilla commented on a diff in the pull request:

https://github.com/apache/flink/pull/5024#discussion_r152427259
  
--- Diff: docs/index.md ---
@@ -23,19 +24,17 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-
-
 This documentation is for Apache Flink version {{ site.version_title }}. 
These pages were built at: {% build_time %}.
 
 Apache Flink is an open source platform for distributed stream and batch 
data processing. Flink’s core is a streaming dataflow engine that provides 
data distribution, communication, and fault tolerance for distributed 
computations over data streams. Flink builds batch processing on top of the 
streaming engine, overlaying native iteration support, managed memory, and 
program optimization.
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow 
Programming Model](concepts/programming-model.html) and [Distributed Runtime 
Environment](concepts/runtime.html). This will help you understand other parts 
of the documentation, including the setup and programming guides. We 
recommended you read these sections first.
+-   **Concepts**: We recommend you start with the basic concepts of 
Flink's [Dataflow Programming Model](concepts/programming-model.html) and 
[Distributed Runtime Environment](concepts/runtime.html). This will help you 
understand other parts of the documentation, including the setup and 
programming guides.
 
-- **Quickstarts**: [Run an example 
program](quickstart/setup_quickstart.html) on your local machine or [study some 
examples](examples/index.html).
+-   **Quickstarts**: [Run an example 
program](quickstart/setup_quickstart.html) on your local machine or [study some 
examples](examples/index.html).
 
-- **Programming Guides**: You can read our guides about [basic API 
concepts](dev/api_concepts.html) and the [DataStream 
API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to 
learn how to write your first Flink programs.
+-   **Programming Guides**: You can read our guides about [basic API 
concepts](dev/api_concepts.html), the [DataStream API](dev/datastream_api.html) 
or the [DataSet API](dev/batch/index.html) to learn how to write your first 
Flink programs.
--- End diff --

@greghogan No, this was an attempt to make the whole passage flow better as 
it says…

> xxx and yyy or zzz

So I was attempting to seperate out the two parts of the paragraph, better 
may be…

> **Programming Guides**: You can read our guides about [basic API 
concepts](dev/api_concepts.html), the [DataStream 
API](dev/datastream_api.html), and the [DataSet API](dev/batch/index.html) to 
learn how to write your first Flink programs.

Then it's an oxford comma, but that's stylistic.


---


[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...

2017-11-21 Thread ChrisChinchilla
Github user ChrisChinchilla commented on a diff in the pull request:

https://github.com/apache/flink/pull/5024#discussion_r152426833
  
--- Diff: docs/index.md ---
@@ -23,19 +24,17 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-
-
 This documentation is for Apache Flink version {{ site.version_title }}. 
These pages were built at: {% build_time %}.
 
 Apache Flink is an open source platform for distributed stream and batch 
data processing. Flink’s core is a streaming dataflow engine that provides 
data distribution, communication, and fault tolerance for distributed 
computations over data streams. Flink builds batch processing on top of the 
streaming engine, overlaying native iteration support, managed memory, and 
program optimization.
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow 
Programming Model](concepts/programming-model.html) and [Distributed Runtime 
Environment](concepts/runtime.html). This will help you understand other parts 
of the documentation, including the setup and programming guides. We 
recommended you read these sections first.
+-   **Concepts**: We recommend you start with the basic concepts of 
Flink's [Dataflow Programming Model](concepts/programming-model.html) and 
[Distributed Runtime Environment](concepts/runtime.html). This will help you 
understand other parts of the documentation, including the setup and 
programming guides.
--- End diff --

@greghogan It's something my Markdown linter 'fixes', it doesn't matter too 
much, but is apparently more correct.


---


[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...

2017-11-21 Thread ChrisChinchilla
Github user ChrisChinchilla commented on a diff in the pull request:

https://github.com/apache/flink/pull/5024#discussion_r152426709
  
--- Diff: docs/README.md ---
@@ -90,7 +90,7 @@ This will be replaced with the value of the variable 
called `NAME` when generati
 
  Headings
 
-All documents are structured with headings. From these headings, you can 
automatically generate a page table of contents (see below).
+All documents are structured with headings, written in "Title Case". From 
these headings, you can automatically generate a page table of contents (see 
below).
--- End diff --

@greghogan Not sure, I have found a lot of inconsistent headings in the 
docs, so it seemed worthwhile adding, doesn't mean anyone will follow it of 
course :)


---


[jira] [Commented] (FLINK-4877) Refactorings around FLINK-3674 (User Function Timers)

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261566#comment-16261566
 ] 

ASF GitHub Bot commented on FLINK-4877:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4877
  
@vim-wj if you are okay with Stephan's suggestion could you close this pull 
request?

Also, a small note: `FLINK-4877` references a [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-4877) rather than a pull 
request (use `[hotfix]` for simple issues not requiring a ticket).


> Refactorings around FLINK-3674 (User Function Timers)
> -
>
> Key: FLINK-4877
> URL: https://issues.apache.org/jira/browse/FLINK-4877
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8126) Update and fix checkstyle

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261567#comment-16261567
 ] 

ASF GitHub Bot commented on FLINK-8126:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5044
  
+1


> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4877: [FLINK-4877] About SourceFunction extends Serializable

2017-11-21 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4877
  
@vim-wj if you are okay with Stephan's suggestion could you close this pull 
request?

Also, a small note: `FLINK-4877` references a [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-4877) rather than a pull 
request (use `[hotfix]` for simple issues not requiring a ticket).


---


[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle

2017-11-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5044
  
+1


---


[jira] [Commented] (FLINK-7316) always use off-heap network buffers

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261560#comment-16261560
 ] 

ASF GitHub Bot commented on FLINK-7316:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4481
  
rebased again - this should be good to go. @StefanRRichter can you continue 
with this?


> always use off-heap network buffers
> ---
>
> Key: FLINK-7316
> URL: https://issues.apache.org/jira/browse/FLINK-7316
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> In order to send flink buffers through netty into the network, we need to 
> make the buffers use off-heap memory. Otherwise, there will be a hidden copy 
> happening in the NIO stack.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4481: [FLINK-7316][network] always use off-heap network buffers

2017-11-21 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4481
  
rebased again - this should be good to go. @StefanRRichter can you continue 
with this?


---


[jira] [Commented] (FLINK-8126) Update and fix checkstyle

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261538#comment-16261538
 ] 

ASF GitHub Bot commented on FLINK-8126:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5044

[FLINK-8126] [build] Fix and update checkstyle

## What is the purpose of the change

Update to the latest checkstyle version and fix the errors not previously 
detected.

## Brief change log

- update checkstyle to version 8.4 from version 6.19
- in `checkstyle.xml` move `SuppressionCommentFilter` under `TreeWalker` 
and remove `FileContentsHolder` (see checkstyle/checkstyle pr4714)
- correct latent checkstyle errors

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 8126_update_and_fix_checkstyle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5044.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5044


commit 3a01732d4e823f732f5fa4b63054b4b8f9e40f3e
Author: Greg Hogan 
Date:   2017-11-21T19:05:53Z

[FLINK-8126] [build] Fix and update checkstyle

Update to the latest checkstyle version and fix the errors not
previously detected.




> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5044: [FLINK-8126] [build] Fix and update checkstyle

2017-11-21 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5044

[FLINK-8126] [build] Fix and update checkstyle

## What is the purpose of the change

Update to the latest checkstyle version and fix the errors not previously 
detected.

## Brief change log

- update checkstyle to version 8.4 from version 6.19
- in `checkstyle.xml` move `SuppressionCommentFilter` under `TreeWalker` 
and remove `FileContentsHolder` (see checkstyle/checkstyle pr4714)
- correct latent checkstyle errors

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 8126_update_and_fix_checkstyle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5044.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5044


commit 3a01732d4e823f732f5fa4b63054b4b8f9e40f3e
Author: Greg Hogan 
Date:   2017-11-21T19:05:53Z

[FLINK-8126] [build] Fix and update checkstyle

Update to the latest checkstyle version and fix the errors not
previously detected.




---


[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...

2017-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5006#discussion_r152406271
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -162,14 +161,19 @@ So far, you have set up your cluster to run with 
queryable state and you have de
 queryable. Now it is time to see how to query this state. 
 
 For this you can use the `QueryableStateClient` helper class. This is 
available in the `flink-queryable-state-client` 
-jar which you have to explicitly include as a dependency in the `pom.xml` 
of your project, as shown below:
+jar which you have to explicitly include as a dependency in the `pom.xml` 
of your project along with `flink-core`, as shown below:
 
 
 {% highlight xml %}
 
   org.apache.flink
-  flink-queryable-state-client-java_{{ 
site.scala_version_suffix }}
-  {{site.version }}
+  flink-core
+  {{ site.version }}
+
+
+  org.apache.flink
+  flink-queryable-state-client-java{{ 
site.scala_version_suffix }}
--- End diff --

no, the underscore is included in `site.scala_version_suffix`.


---


[GitHub] flink pull request #4946: [FLINK-7967] [config] Deprecate Hadoop specific Fl...

2017-11-21 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4946#discussion_r152405696
  
--- Diff: flink-dist/src/main/resources/flink-conf.yaml ---
@@ -151,6 +151,9 @@ jobmanager.web.port: 8081
 
 # Path to the Hadoop configuration directory.
 #
+# Warning: these keys are deprecated and will be removed in 1.5. Instead, 
use
--- End diff --

I see your question on a specific future version for removing these keys 
was not answered but I expect that it won't be for 1.5 if ever. I think we can 
leave this at the deprecation notice and recommendation to use 
`HADOOP_CONF_DIR` without stating a time for removal.


---


[jira] [Commented] (FLINK-7967) Deprecate Hadoop specific Flink configuration options

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261507#comment-16261507
 ] 

ASF GitHub Bot commented on FLINK-7967:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4946#discussion_r152405696
  
--- Diff: flink-dist/src/main/resources/flink-conf.yaml ---
@@ -151,6 +151,9 @@ jobmanager.web.port: 8081
 
 # Path to the Hadoop configuration directory.
 #
+# Warning: these keys are deprecated and will be removed in 1.5. Instead, 
use
--- End diff --

I see your question on a specific future version for removing these keys 
was not answered but I expect that it won't be for 1.5 if ever. I think we can 
leave this at the deprecation notice and recommendation to use 
`HADOOP_CONF_DIR` without stating a time for removal.


> Deprecate Hadoop specific Flink configuration options
> -
>
> Key: FLINK-7967
> URL: https://issues.apache.org/jira/browse/FLINK-7967
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Till Rohrmann
>Priority: Trivial
>
> I think we should deprecate the hadoop specific configuration options from 
> Flink and encourage people to use instead the environment variable 
> {{HADOOP_CONF_DIR}} to configure the Hadoop configuration directory. This 
> includes:
> {code}
> fs.hdfs.hdfsdefault
> fs.hdfs.hdfssite
> fs.hdfs.hadoopconf
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261503#comment-16261503
 ] 

ASF GitHub Bot commented on FLINK-8070:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5012
  
Also, this particular exception stops at after `Caused by: 
java.lang.VerifyError: Inconsistent stackmap frames at branch target 152` 
because the next lines don't look like a stack trace.

Here's a snippet:
```
  at 
org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123)
Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch 
target 152
Exception Details:
  Location:
akka/dispatch/Mailbox.processAllSystemMessages()V @152: getstatic
  Reason:
```


> YarnTestBase should print prohibited string
> ---
>
> Key: FLINK-8070
> URL: https://issues.apache.org/jira/browse/FLINK-8070
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> The yarn tests check the log files for a set of prohibited strings. If found, 
> the entire log file is logged as WARN, the offending line is logged as ERROR, 
> and the test fails with this unhelpful message:
> {code}
> java.lang.AssertionError(Found a file 
> /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1510164935122_0002/container_1510164935122_0002_01_01/jobmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081])
> {code}
> If you don't have log access on travis you have thus no knowledge what 
> actually went wrong.
> I propose to also print smaller excerpts around the found error (like 10 
> lines or smth) in the Assert.fail message.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files

2017-11-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5012
  
Also, this particular exception stops at after `Caused by: 
java.lang.VerifyError: Inconsistent stackmap frames at branch target 152` 
because the next lines don't look like a stack trace.

Here's a snippet:
```
  at 
org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123)
Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch 
target 152
Exception Details:
  Location:
akka/dispatch/Mailbox.processAllSystemMessages()V @152: getstatic
  Reason:
```


---


[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files

2017-11-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5012
  
In this particular stack trace, you have the stack trace of the assertion 
error

```
java.lang.AssertionError(... exception message ...)
at 
org.junit.runners.model.MultipleFailureException.assertEmpty(MultipleFailureException.java:67)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:39)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
```

and the excerpts in the exception message of the AssertionError, as a list 
of exceptions

```
Found a file 
/home/Zento/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1510667711263_0001/container_1510667711263_0001_01_01/jobmanager.log
 with a prohibited string (one of [Exception, Started 
SelectChannelConnector@0.0.0.0:8081]). Excerpts:
[
java.lang.Exception: Could not create actor system
at 
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:171)
at 
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:115)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:313)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:199)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:196)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123)
Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch 
target 152
]
```

The formatting isn't really ideal, but i don't know of an easy way to 
change it. (And it's still better than nothing)



---


[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261494#comment-16261494
 ] 

ASF GitHub Bot commented on FLINK-8070:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5012
  
In this particular stack trace, you have the stack trace of the assertion 
error

```
java.lang.AssertionError(... exception message ...)
at 
org.junit.runners.model.MultipleFailureException.assertEmpty(MultipleFailureException.java:67)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:39)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
```

and the excerpts in the exception message of the AssertionError, as a list 
of exceptions

```
Found a file 
/home/Zento/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1510667711263_0001/container_1510667711263_0001_01_01/jobmanager.log
 with a prohibited string (one of [Exception, Started 
SelectChannelConnector@0.0.0.0:8081]). Excerpts:
[
java.lang.Exception: Could not create actor system
at 
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:171)
at 
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:115)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:313)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:199)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:196)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:196)
at 
org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:123)
Caused by: java.lang.VerifyError: Inconsistent stackmap frames at branch 
target 152
]
```

The formatting isn't really ideal, but i don't know of an easy way to 
change it. (And it's still better than nothing)



> YarnTestBase should print prohibited string
> ---
>
> Key: FLINK-8070
> URL: https://issues.apache.org/jira/browse/FLINK-8070
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> The yarn tests check the log files for a set of prohibited 

[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...

2017-11-21 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5006#discussion_r152402516
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -162,14 +161,19 @@ So far, you have set up your cluster to run with 
queryable state and you have de
 queryable. Now it is time to see how to query this state. 
 
 For this you can use the `QueryableStateClient` helper class. This is 
available in the `flink-queryable-state-client` 
-jar which you have to explicitly include as a dependency in the `pom.xml` 
of your project, as shown below:
+jar which you have to explicitly include as a dependency in the `pom.xml` 
of your project along with `flink-core`, as shown below:
--- End diff --

"you have to explicitly include" -> "must be explicitly included"?


---


[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...

2017-11-21 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5006#discussion_r152402801
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -162,14 +161,19 @@ So far, you have set up your cluster to run with 
queryable state and you have de
 queryable. Now it is time to see how to query this state. 
 
 For this you can use the `QueryableStateClient` helper class. This is 
available in the `flink-queryable-state-client` 
-jar which you have to explicitly include as a dependency in the `pom.xml` 
of your project, as shown below:
+jar which you have to explicitly include as a dependency in the `pom.xml` 
of your project along with `flink-core`, as shown below:
 
 
 {% highlight xml %}
 
   org.apache.flink
-  flink-queryable-state-client-java_{{ 
site.scala_version_suffix }}
-  {{site.version }}
+  flink-core
+  {{ site.version }}
+
+
+  org.apache.flink
+  flink-queryable-state-client-java{{ 
site.scala_version_suffix }}
--- End diff --

Do we need to preserve the underscore after `java`?


---


[jira] [Closed] (FLINK-8123) Bundle python library in jar

2017-11-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8123.
---
Resolution: Fixed

1.5: c4107d4c336ed8dbadc03a7018eb255f4df3d1cc

> Bundle python library in jar
> 
>
> Key: FLINK-8123
> URL: https://issues.apache.org/jira/browse/FLINK-8123
> Project: Flink
>  Issue Type: Improvement
>  Components: Python API
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently, the flink-python library is split into 2 parts in flink-dist; the 
> flink-python jar in the /lib directory, and the python scripts in the 
> /resources directory.
> I propose to bundle the python scripts in the flink-python jar. This way, the 
> jar is self-contained and we no longer need to search for the python scripts 
> (which was hacky and had a separate codepath for tests).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...

2017-11-21 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5006#discussion_r152402097
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -60,7 +60,7 @@ The Queryable State feature consists of three main 
entities:
  returning it to the client, and 
  3. the `QueryableStateServer` which runs on each `TaskManager` and is 
responsible for serving the locally stored state.
  
-In a nutshell, the client will connect to one of the proxies and send a 
request for the state associated with a specific 
+The client will connect to one of the proxies and send a request for the 
state associated with a specific 
--- End diff --

"will connect" -> "connects" (and "send" -> "sends")?


---


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261489#comment-16261489
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5043

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

## What is the purpose of the change

* Adds `OrcRowInputFormat` to read [ORC files](https://orc.apache.org) as 
`DataSet`. The input format supports projection and filter push-down.
* Adds `OrcTableSource` to read [ORC files](https://orc.apache.org) as a 
`Table` in a batch Table API or SQL query. The table source supports projection 
and filter push-down.

## Brief change log

* Creates a new module `flink-connectors/flink-orc`
* Add `OrcRowInputFormat`
* Add `OrcTableSource`
* Add tests for input format and table source
* Adjust cost model of batch table scans to favor table sources with 
pushed-down filters over those without pushed-down filters. 
* Add static method to `RowTypeInfo` to project on fields.
* Improve translation of literals in `RexProgramExtractor`

## Verifying this change

* `OrcRowInputFormatTest` verifies 
  * Correct configuration of ORC readers.
  * Results when reading ORC files
  * Schema evolution support
  * Computation of split boundaries

* `OrcTableSourceTest` verifies
  * Correct implementation of TableSource interface methods
  * Correct configuration of `OrcRowInputFormat` for test queries 
(predicate and filter push-down)

* `OrcTableSourceITCase` runs end-to-end tests with SQL queries.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **yes**, adds a new 
Maven module `flink-orc` with a dependency on `org.apache.orc/orc-core`
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **yes**, documentation for 
`RowTableSource` was added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink table-ORC

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5043


commit 2f524dfa0c4f8468691151925a622ba7fee55f0f
Author: uybhatti 
Date:   2017-03-03T22:55:22Z

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

commit d80506e3785268f541457a69ade3118c634cf7e7
Author: Fabian Hueske 
Date:   2017-11-13T13:54:54Z

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.




> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-21 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5043

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

## What is the purpose of the change

* Adds `OrcRowInputFormat` to read [ORC files](https://orc.apache.org) as 
`DataSet`. The input format supports projection and filter push-down.
* Adds `OrcTableSource` to read [ORC files](https://orc.apache.org) as a 
`Table` in a batch Table API or SQL query. The table source supports projection 
and filter push-down.

## Brief change log

* Creates a new module `flink-connectors/flink-orc`
* Add `OrcRowInputFormat`
* Add `OrcTableSource`
* Add tests for input format and table source
* Adjust cost model of batch table scans to favor table sources with 
pushed-down filters over those without pushed-down filters. 
* Add static method to `RowTypeInfo` to project on fields.
* Improve translation of literals in `RexProgramExtractor`

## Verifying this change

* `OrcRowInputFormatTest` verifies 
  * Correct configuration of ORC readers.
  * Results when reading ORC files
  * Schema evolution support
  * Computation of split boundaries

* `OrcTableSourceTest` verifies
  * Correct implementation of TableSource interface methods
  * Correct configuration of `OrcRowInputFormat` for test queries 
(predicate and filter push-down)

* `OrcTableSourceITCase` runs end-to-end tests with SQL queries.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **yes**, adds a new 
Maven module `flink-orc` with a dependency on `org.apache.orc/orc-core`
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **yes**, documentation for 
`RowTableSource` was added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink table-ORC

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5043


commit 2f524dfa0c4f8468691151925a622ba7fee55f0f
Author: uybhatti 
Date:   2017-03-03T22:55:22Z

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

commit d80506e3785268f541457a69ade3118c634cf7e7
Author: Fabian Hueske 
Date:   2017-11-13T13:54:54Z

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.




---


[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files

2017-11-21 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5012
  
LGTM. I'm puzzled why the `])` is printed out-of-order in the middle of the 
stack trace.


---


[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261448#comment-16261448
 ] 

ASF GitHub Bot commented on FLINK-8070:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5012
  
LGTM. I'm puzzled why the `])` is printed out-of-order in the middle of the 
stack trace.


> YarnTestBase should print prohibited string
> ---
>
> Key: FLINK-8070
> URL: https://issues.apache.org/jira/browse/FLINK-8070
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> The yarn tests check the log files for a set of prohibited strings. If found, 
> the entire log file is logged as WARN, the offending line is logged as ERROR, 
> and the test fails with this unhelpful message:
> {code}
> java.lang.AssertionError(Found a file 
> /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1510164935122_0002/container_1510164935122_0002_01_01/jobmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081])
> {code}
> If you don't have log access on travis you have thus no knowledge what 
> actually went wrong.
> I propose to also print smaller excerpts around the found error (like 10 
> lines or smth) in the Assert.fail message.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8084) Remove japicmp deactivations in several modules

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261430#comment-16261430
 ] 

ASF GitHub Bot commented on FLINK-8084:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5020
  
+1


> Remove japicmp deactivations in several modules
> ---
>
> Key: FLINK-8084
> URL: https://issues.apache.org/jira/browse/FLINK-8084
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> The japicmp module is explicitly deactivated in the following modules:
> * java8
> * quickstart
> * yarn-tests
> Since the module has to be explicitly enabled these entries can be removed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5020: [FLINK-8084][build] Remove unnecessary japicmp pom entrie...

2017-11-21 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5020
  
+1


---


[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...

2017-11-21 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5024#discussion_r152384448
  
--- Diff: docs/index.md ---
@@ -23,19 +24,17 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-
-
 This documentation is for Apache Flink version {{ site.version_title }}. 
These pages were built at: {% build_time %}.
 
 Apache Flink is an open source platform for distributed stream and batch 
data processing. Flink’s core is a streaming dataflow engine that provides 
data distribution, communication, and fault tolerance for distributed 
computations over data streams. Flink builds batch processing on top of the 
streaming engine, overlaying native iteration support, managed memory, and 
program optimization.
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow 
Programming Model](concepts/programming-model.html) and [Distributed Runtime 
Environment](concepts/runtime.html). This will help you understand other parts 
of the documentation, including the setup and programming guides. We 
recommended you read these sections first.
+-   **Concepts**: We recommend you start with the basic concepts of 
Flink's [Dataflow Programming Model](concepts/programming-model.html) and 
[Distributed Runtime Environment](concepts/runtime.html). This will help you 
understand other parts of the documentation, including the setup and 
programming guides.
--- End diff --

Why the extra spacing?


---


[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...

2017-11-21 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5024#discussion_r152386126
  
--- Diff: docs/index.md ---
@@ -23,19 +24,17 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-
-
 This documentation is for Apache Flink version {{ site.version_title }}. 
These pages were built at: {% build_time %}.
 
 Apache Flink is an open source platform for distributed stream and batch 
data processing. Flink’s core is a streaming dataflow engine that provides 
data distribution, communication, and fault tolerance for distributed 
computations over data streams. Flink builds batch processing on top of the 
streaming engine, overlaying native iteration support, managed memory, and 
program optimization.
 
 ## First Steps
 
-- **Concepts**: Start with the basic concepts of Flink's [Dataflow 
Programming Model](concepts/programming-model.html) and [Distributed Runtime 
Environment](concepts/runtime.html). This will help you understand other parts 
of the documentation, including the setup and programming guides. We 
recommended you read these sections first.
+-   **Concepts**: We recommend you start with the basic concepts of 
Flink's [Dataflow Programming Model](concepts/programming-model.html) and 
[Distributed Runtime Environment](concepts/runtime.html). This will help you 
understand other parts of the documentation, including the setup and 
programming guides.
 
-- **Quickstarts**: [Run an example 
program](quickstart/setup_quickstart.html) on your local machine or [study some 
examples](examples/index.html).
+-   **Quickstarts**: [Run an example 
program](quickstart/setup_quickstart.html) on your local machine or [study some 
examples](examples/index.html).
 
-- **Programming Guides**: You can read our guides about [basic API 
concepts](dev/api_concepts.html) and the [DataStream 
API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to 
learn how to write your first Flink programs.
+-   **Programming Guides**: You can read our guides about [basic API 
concepts](dev/api_concepts.html), the [DataStream API](dev/datastream_api.html) 
or the [DataSet API](dev/batch/index.html) to learn how to write your first 
Flink programs.
--- End diff --

Oxford comma?


---


[GitHub] flink pull request #5024: [hotfix][docs] Readme review to clarify heading fo...

2017-11-21 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5024#discussion_r152384103
  
--- Diff: docs/README.md ---
@@ -90,7 +90,7 @@ This will be replaced with the value of the variable 
called `NAME` when generati
 
  Headings
 
-All documents are structured with headings. From these headings, you can 
automatically generate a page table of contents (see below).
+All documents are structured with headings, written in "Title Case". From 
these headings, you can automatically generate a page table of contents (see 
below).
--- End diff --

It seems that this could be left as an implicit assumption for "headings".


---


[jira] [Commented] (FLINK-8036) Consider using gradle to build Flink

2017-11-21 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261371#comment-16261371
 ] 

Greg Hogan commented on FLINK-8036:
---

Are many developers running tests from the command line or using an IDE for 
incremental compilation and selectively running tests?

> Consider using gradle to build Flink
> 
>
> Key: FLINK-8036
> URL: https://issues.apache.org/jira/browse/FLINK-8036
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> Here is summary from Lukasz over this thread 
> (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool)
>  w.r.t. performance boost from using gradle:
> Maven performs parallelization at the module level, an entire module needs
> to complete before any dependent modules can start, this means running all
> the checks like findbugs, checkstyle, tests need to finish. Gradle has task
> level parallelism between subprojects which means that as soon as the
> compile and shade steps are done for a project, and dependent subprojects
> can typically start. This means that we get increased parallelism due to
> not needing to wait for findbugs, checkstyle, tests to run. I typically see
> ~20 tasks (at peak) running on my desktop in parallel.
> Flink should consider using gradle - on Linux with SSD, a clean build takes 
> an hour.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8113) Bump maven-shade-plugin to 3.0.0

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261335#comment-16261335
 ] 

ASF GitHub Bot commented on FLINK-8113:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5042
  
I looked through the open issues for 3.0.0, and the ones that were fixed 
for 3.1.0 and i couldn't find anything critical.


> Bump maven-shade-plugin to 3.0.0
> 
>
> Key: FLINK-8113
> URL: https://issues.apache.org/jira/browse/FLINK-8113
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0
>
>
> We should investigate whether we can bump the shade plugin to 3.0.0. Earlier 
> versions do not properly relocate services, forcing some modules to set a 
> different plugin version in their own configuration (flink-s3-fs-presto for 
> example, or flink-dist after FLINK-8111).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5042: [FLINK-8113][build] Bump maven-shade-plugin to 3.0.0

2017-11-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5042
  
I looked through the open issues for 3.0.0, and the ones that were fixed 
for 3.1.0 and i couldn't find anything critical.


---


[jira] [Updated] (FLINK-8126) Update and fix checkstyle

2017-11-21 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-8126:
--
Component/s: Build System

> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8126) Update and fix checkstyle

2017-11-21 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-8126:
-

 Summary: Update and fix checkstyle
 Key: FLINK-8126
 URL: https://issues.apache.org/jira/browse/FLINK-8126
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Trivial
 Fix For: 1.5.0


Our current checkstyle configuration (checkstyle version 6.19) is missing some 
ImportOrder and variable naming errors which are detected in 1) IntelliJ using 
the same checkstyle version and 2) with the maven-checkstyle-plugin with an 
up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6294) BucketingSink throws NPE while cancelling job

2017-11-21 Thread Cristian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261228#comment-16261228
 ] 

Cristian commented on FLINK-6294:
-

This is happening to me on Flink 1.3.2. The line is different, but I guess it's 
the same problem:


{code:java}
2017-11-21 16:55:16,276 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
disposal of stream operator.
java.lang.NullPointerException
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
{code}


> BucketingSink throws NPE while cancelling job
> -
>
> Key: FLINK-6294
> URL: https://issues.apache.org/jira/browse/FLINK-6294
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: Andrey
>
> Steps to reproduce:
> * configure BucketingSink and run job
> * cancel job from UI before processing any messages
> * in logs:
> {code}
> 2017-04-11 10:14:54,681 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Source: Custom 
> Source (1/2) [Source: Custom Source (1/2)]
> 2017-04-11 10:14:54,881 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) 
> [flink-akka.actor.default-dispatcher-4]
> 2017-04-11 10:14:56,584 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)]
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261209#comment-16261209
 ] 

ASF GitHub Bot commented on FLINK-8117:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5041
  
Merging this...


> Eliminate modulo operation from RoundRobinChannelSelector and 
> RebalancePartitioner
> --
>
> Key: FLINK-8117
> URL: https://issues.apache.org/jira/browse/FLINK-8117
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
> Fix For: 1.5.0
>
>
> {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and 
> {{RescalePartitioner}} use a modulo operation to wrap around when the current 
> channel counter reaches the number of channels. Using an {{if}} would have 
> better performance.
> A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
> the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
> most of the time predict the condition to be false.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5041: [FLINK-8117] [runtime] [streaming] Eliminate modulo opera...

2017-11-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5041
  
Merging this...


---


[jira] [Created] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-21 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8125:
---

 Summary: Support limiting the number of open FileSystem connections
 Key: FLINK-8125
 URL: https://issues.apache.org/jira/browse/FLINK-8125
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.4.1, 1.5.0


We need a way to limit the number of streams that Flink FileSystems 
concurrently open.
For example, for very small HDFS clusters with few RPC handlers, a large Flink 
job trying to build up many connections during a checkpoint causes failures due 
to rejected connections. 

I propose to add a file system that can wrap another existing file system The 
file system may track the progress of streams and close streams that have been 
inactive for too long, to avoid locked streams of taking up the complete pool.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8036) Consider using gradle to build Flink

2017-11-21 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261193#comment-16261193
 ] 

Ted Yu commented on FLINK-8036:
---

Last note: when I see gradlew run selected test(s) so fast, I wish maven 
doesn't have to go over all the modules before coming to the module where the 
test resides.

> Consider using gradle to build Flink
> 
>
> Key: FLINK-8036
> URL: https://issues.apache.org/jira/browse/FLINK-8036
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> Here is summary from Lukasz over this thread 
> (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool)
>  w.r.t. performance boost from using gradle:
> Maven performs parallelization at the module level, an entire module needs
> to complete before any dependent modules can start, this means running all
> the checks like findbugs, checkstyle, tests need to finish. Gradle has task
> level parallelism between subprojects which means that as soon as the
> compile and shade steps are done for a project, and dependent subprojects
> can typically start. This means that we get increased parallelism due to
> not needing to wait for findbugs, checkstyle, tests to run. I typically see
> ~20 tasks (at peak) running on my desktop in parallel.
> Flink should consider using gradle - on Linux with SSD, a clean build takes 
> an hour.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261140#comment-16261140
 ] 

ASF GitHub Bot commented on FLINK-8117:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5041
  
Very good change, and nice implementation!

+1 to merge this.


> Eliminate modulo operation from RoundRobinChannelSelector and 
> RebalancePartitioner
> --
>
> Key: FLINK-8117
> URL: https://issues.apache.org/jira/browse/FLINK-8117
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
> Fix For: 1.5.0
>
>
> {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and 
> {{RescalePartitioner}} use a modulo operation to wrap around when the current 
> channel counter reaches the number of channels. Using an {{if}} would have 
> better performance.
> A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
> the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
> most of the time predict the condition to be false.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5041: [FLINK-8117] [runtime] [streaming] Eliminate modulo opera...

2017-11-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5041
  
Very good change, and nice implementation!

+1 to merge this.


---


[jira] [Commented] (FLINK-8113) Bump maven-shade-plugin to 3.0.0

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261134#comment-16261134
 ] 

ASF GitHub Bot commented on FLINK-8113:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5042
  
Looks good to me.

Do we know if there are any new known issues in shade "3.0.0"? The first 
release of a new major version sometimes has a few regressions. Have you 
checked the Maven JIRA per chance?

Otherwise +1




> Bump maven-shade-plugin to 3.0.0
> 
>
> Key: FLINK-8113
> URL: https://issues.apache.org/jira/browse/FLINK-8113
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0
>
>
> We should investigate whether we can bump the shade plugin to 3.0.0. Earlier 
> versions do not properly relocate services, forcing some modules to set a 
> different plugin version in their own configuration (flink-s3-fs-presto for 
> example, or flink-dist after FLINK-8111).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5042: [FLINK-8113][build] Bump maven-shade-plugin to 3.0.0

2017-11-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5042
  
Looks good to me.

Do we know if there are any new known issues in shade "3.0.0"? The first 
release of a new major version sometimes has a few regressions. Have you 
checked the Maven JIRA per chance?

Otherwise +1




---


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261122#comment-16261122
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5032#discussion_r152348905
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 ---
@@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() 
throws Exception {
assertFalse(value.iterator().hasNext());
}
 
+   @Test(expected = DuplicateStateNameException.class)
+   public void testDuplicateStateName() throws Exception {
+   StreamingRuntimeContext context = new StreamingRuntimeContext(
+   createMapPlainMockOp(),
+   createMockEnvironment(),
+   Collections.emptyMap());
+   MapStateDescriptor mapStateDesc =
+   new MapStateDescriptor<>("name", Integer.class, 
String.class);
+   ListStateDescriptor listStateDesc =
+   new ListStateDescriptor<>("name", String.class);
+   context.getMapState(mapStateDesc);
--- End diff --

LGTM!

Just wondering which will throw the exception? `getMapState()` or 
`getListState()`? If the former, then we don't need the latter one here.


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> 

[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...

2017-11-21 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5032#discussion_r152348905
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 ---
@@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() 
throws Exception {
assertFalse(value.iterator().hasNext());
}
 
+   @Test(expected = DuplicateStateNameException.class)
+   public void testDuplicateStateName() throws Exception {
+   StreamingRuntimeContext context = new StreamingRuntimeContext(
+   createMapPlainMockOp(),
+   createMockEnvironment(),
+   Collections.emptyMap());
+   MapStateDescriptor mapStateDesc =
+   new MapStateDescriptor<>("name", Integer.class, 
String.class);
+   ListStateDescriptor listStateDesc =
+   new ListStateDescriptor<>("name", String.class);
+   context.getMapState(mapStateDesc);
--- End diff --

LGTM!

Just wondering which will throw the exception? `getMapState()` or 
`getListState()`? If the former, then we don't need the latter one here.


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261077#comment-16261077
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152342531
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -108,11 +108,7 @@ int releaseMemory() throws IOException {
for (int i = 0; i < numBuffers; i++) {
Buffer buffer = buffers.remove();
spilledBytes += buffer.getSize();
-   try {
-   spillWriter.writeBlock(buffer);
-   } finally {
-   buffer.recycle();
-   }
+   spillWriter.writeBlock(buffer);
--- End diff --

Actually, if I see this correctly, here the original code is wrong since it 
is already recycling a buffer which was added to an asynchronous file write 
operation. This would lead to data corruption if the buffer is re-used in the 
meanwhile, wouldn't it?!


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] also let AsynchronousBufferFileWr...

2017-11-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152342531
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -108,11 +108,7 @@ int releaseMemory() throws IOException {
for (int i = 0; i < numBuffers; i++) {
Buffer buffer = buffers.remove();
spilledBytes += buffer.getSize();
-   try {
-   spillWriter.writeBlock(buffer);
-   } finally {
-   buffer.recycle();
-   }
+   spillWriter.writeBlock(buffer);
--- End diff --

Actually, if I see this correctly, here the original code is wrong since it 
is already recycling a buffer which was added to an asynchronous file write 
operation. This would lead to data corruption if the buffer is re-used in the 
meanwhile, wouldn't it?!


---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261075#comment-16261075
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152342364
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
 ---
@@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, 
RequestQueue
super(channelID, requestQueue, CALLBACK, true);
}
 
+   /**
+* Writes the given block asynchronously.
+*
+* @param buffer
+*  the buffer to be written (will be recycled when done)
--- End diff --

good catch, but actually, `SpillableSubpartition` doesn't do any recycling 
itself: in its `finish()` method, it relies on the buffer being on-heap and 
then garbage-collected, for the `add()` function, it relies on the caller, i.e. 
`ResultPartition#add()` (which I also forgot to adapt).


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] also let AsynchronousBufferFileWr...

2017-11-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152342364
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
 ---
@@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, 
RequestQueue
super(channelID, requestQueue, CALLBACK, true);
}
 
+   /**
+* Writes the given block asynchronously.
+*
+* @param buffer
+*  the buffer to be written (will be recycled when done)
--- End diff --

good catch, but actually, `SpillableSubpartition` doesn't do any recycling 
itself: in its `finish()` method, it relies on the buffer being on-heap and 
then garbage-collected, for the `add()` function, it relies on the caller, i.e. 
`ResultPartition#add()` (which I also forgot to adapt).


---


[jira] [Comment Edited] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint

2017-11-21 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261014#comment-16261014
 ] 

Gary Yao edited comment on FLINK-7716 at 11/21/17 4:39 PM:
---

[~yew1eb] This ticket and FLINK-7717 are similar to FLINK-7718 which I am 
currently working on. I wanted to ask you about your progress on this ticket 
because I can see that we will probably need a common base class for the REST 
handlers similar to 
{{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler}}.
 I can create one based on my current work, or if you already have one, I can 
rebase my code against your changes. What do you think?


was (Author: gjy):
[~yew1eb] This ticket is similar to FLINK-7718 which I am currently working on. 
I wanted to ask you about your progress on this ticket because I can see that 
we will probably need a common base class for the REST handlers similar to 
{{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler}}.
 I can create one based on my current work, or if you already have one, I can 
rebase my code against your changes. What do you think?

> Port JobManagerMetricsHandler to new REST endpoint
> --
>
> Key: FLINK-7716
> URL: https://issues.apache.org/jira/browse/FLINK-7716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hai Zhou UTC+8
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JobManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint

2017-11-21 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261014#comment-16261014
 ] 

Gary Yao commented on FLINK-7716:
-

[~yew1eb] This ticket is similar to FLINK-7718 which I am currently working on. 
I wanted to ask you about your progress on this ticket because I can see that 
we will probably need a common base class for the REST handlers similar to 
{{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler}}.
 I can create one based on my current work, or if you already have one, I can 
rebase my code against your changes. What do you think?

> Port JobManagerMetricsHandler to new REST endpoint
> --
>
> Key: FLINK-7716
> URL: https://issues.apache.org/jira/browse/FLINK-7716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hai Zhou UTC+8
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JobManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-4600) Support min/max aggregations for Date/Time/Timestamp/Intervals

2017-11-21 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4600.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

I will close this issue for now. I don't know if we really want to support 
aggregations on intervals. If this issue pops up again, we can reopen it. All 
other time types are supported now.

> Support min/max aggregations for Date/Time/Timestamp/Intervals
> --
>
> Key: FLINK-4600
> URL: https://issues.apache.org/jira/browse/FLINK-4600
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Leo Deng
> Fix For: 1.5.0
>
>
> Currently no aggregation supports temporal types. At least min/max should be 
> added for Date/Time/Timestamp/Intervals.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8097) Add built-in support for min/max aggregation for Date/Time

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261011#comment-16261011
 ] 

ASF GitHub Bot commented on FLINK-8097:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5027


> Add built-in support for min/max aggregation for Date/Time
> --
>
> Key: FLINK-8097
> URL: https://issues.apache.org/jira/browse/FLINK-8097
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5027: [FLINK-8097] [table] Add built-in support for min/...

2017-11-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5027


---


[jira] [Resolved] (FLINK-8097) Add built-in support for min/max aggregation for Date/Time

2017-11-21 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8097.
-
Resolution: Fixed

Fixed in 1.5: 44c603d2b62fff20a7213ad55512dc38f43a50bc

> Add built-in support for min/max aggregation for Date/Time
> --
>
> Key: FLINK-8097
> URL: https://issues.apache.org/jira/browse/FLINK-8097
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8097) Add built-in support for min/max aggregation for Date/Time

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260993#comment-16260993
 ] 

ASF GitHub Bot commented on FLINK-8097:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5027
  
Thanks @dianfu. Looks good to merge. Merging...


> Add built-in support for min/max aggregation for Date/Time
> --
>
> Key: FLINK-8097
> URL: https://issues.apache.org/jira/browse/FLINK-8097
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5027: [FLINK-8097] [table] Add built-in support for min/max agg...

2017-11-21 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5027
  
Thanks @dianfu. Looks good to merge. Merging...


---


[jira] [Commented] (FLINK-8038) Support MAP value constructor

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260970#comment-16260970
 ] 

ASF GitHub Bot commented on FLINK-8038:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5015


> Support MAP value constructor
> -
>
> Key: FLINK-8038
> URL: https://issues.apache.org/jira/browse/FLINK-8038
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
> Fix For: 1.5.0
>
>
> Similar to https://issues.apache.org/jira/browse/FLINK-4554
> We want to support Map value constructor which is supported by Calcite:
> https://calcite.apache.org/docs/reference.html#value-constructors
> {code:sql}
> SELECT
>   MAP['key1', f0, 'key2', f1] AS stringKeyedMap,
>   MAP['key', 'value'] AS literalMap,
>   MAP[f0, f1] AS fieldMap
> FROM
>   table
> {code}
> This should enable users to construct MapTypeInfo, one of the CompositeType.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8038) Support MAP value constructor

2017-11-21 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8038.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in 1.5: c5f5615cf84026039614701b5e6b3b0e003eada0 & 
9e3439c013928e52ea99fe87579512f1c2b2c28e

> Support MAP value constructor
> -
>
> Key: FLINK-8038
> URL: https://issues.apache.org/jira/browse/FLINK-8038
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
> Fix For: 1.5.0
>
>
> Similar to https://issues.apache.org/jira/browse/FLINK-4554
> We want to support Map value constructor which is supported by Calcite:
> https://calcite.apache.org/docs/reference.html#value-constructors
> {code:sql}
> SELECT
>   MAP['key1', f0, 'key2', f1] AS stringKeyedMap,
>   MAP['key', 'value'] AS literalMap,
>   MAP[f0, f1] AS fieldMap
> FROM
>   table
> {code}
> This should enable users to construct MapTypeInfo, one of the CompositeType.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5015: [FLINK-8038][Table API] Support MAP value construc...

2017-11-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5015


---


[jira] [Commented] (FLINK-8038) Support MAP value constructor

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260883#comment-16260883
 ] 

ASF GitHub Bot commented on FLINK-8038:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5015
  
Thanks for the update @walterddr. I found a little bug for cardinalities. I 
will fix it and merge this PR.


> Support MAP value constructor
> -
>
> Key: FLINK-8038
> URL: https://issues.apache.org/jira/browse/FLINK-8038
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Similar to https://issues.apache.org/jira/browse/FLINK-4554
> We want to support Map value constructor which is supported by Calcite:
> https://calcite.apache.org/docs/reference.html#value-constructors
> {code:sql}
> SELECT
>   MAP['key1', f0, 'key2', f1] AS stringKeyedMap,
>   MAP['key', 'value'] AS literalMap,
>   MAP[f0, f1] AS fieldMap
> FROM
>   table
> {code}
> This should enable users to construct MapTypeInfo, one of the CompositeType.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5015: [FLINK-8038][Table API] Support MAP value constructor

2017-11-21 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5015
  
Thanks for the update @walterddr. I found a little bug for cardinalities. I 
will fix it and merge this PR.


---


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260867#comment-16260867
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152299398
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 ---
@@ -95,15 +97,20 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
 
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(20L, TimeUnit.SECONDS);
+
// Thread pool for client bootstrap (shared between tests)
-   private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
+   private NioEventLoopGroup nioGroup;
 
-   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10L, TimeUnit.SECONDS);
+   @Before
+   public void setUp() throws Exception {
+   nioGroup = new NioEventLoopGroup();
--- End diff --

you could just write `private NioEventLoopGroup nioGroup = new 
NioEventLoopGroup();` and remove the `@Before` method


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260868#comment-16260868
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152294984
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,28 +167,57 @@ public String getClientName() {
 * Shuts down the client and closes all connections.
 *
 * After a call to this method, all returned futures will be failed.
+*
+* @return A {@link CompletableFuture} that will be completed when the 
shutdown process is done.
 */
-   public void shutdown() {
-   if (shutDown.compareAndSet(false, true)) {
+   public CompletableFuture shutdown() {
--- End diff --

should be typed to `Void`.


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260869#comment-16260869
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152300724
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,28 +167,57 @@ public String getClientName() {
 * Shuts down the client and closes all connections.
 *
 * After a call to this method, all returned futures will be failed.
+*
+* @return A {@link CompletableFuture} that will be completed when the 
shutdown process is done.
 */
-   public void shutdown() {
-   if (shutDown.compareAndSet(false, true)) {
+   public CompletableFuture shutdown() {
+   final CompletableFuture newShutdownFuture = new 
CompletableFuture<>();
+   if (clientShutdownFuture.compareAndSet(null, 
newShutdownFuture)) {
+
+   final List connectionFutures = 
new ArrayList<>();
+
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   CompletableFuture.allOf(
+   connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])
+   ).whenComplete((result, throwable) -> {
+   if (throwable != null) {
+   
newShutdownFuture.completeExceptionally(throwable);
+   } else if (bootstrap != null) {
+   EventLoopGroup group = 
bootstrap.group();
+   if (group != null && 
!group.isShutdown()) {
+   group.shutdownGracefully(0L, 
0L, TimeUnit.MILLISECONDS)
+   
.addListener(finished -> {
+   if 
(finished.isSuccess()) {
+   
newShutdownFuture.complete(null);
+   } else {
+   
newShutdownFuture.completeExceptionally(finished.cause());
+   }
+   });
+   } else {
+   
newShutdownFuture.complete(null);
+   }
+   } else {
+   newShutdownFuture.complete(null);
}
+   });
+
+   // check again if in the meantime another thread 
completed the future
+   if (clientShutdownFuture.compareAndSet(null, 
newShutdownFuture)) {
--- End diff --

where in close() do we set the shutdown future to null? I only see that 
being done in sendRequest. (which seems fishy)


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The 

[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260871#comment-16260871
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152297601
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -312,32 +345,43 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
-   synchronized (connectLock) {
-   if (!closed) {
-   if (failureCause == null) {
-   failureCause = cause;
-   }
+   private CompletableFuture close(Throwable cause) {
--- End diff --

same as above


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260866#comment-16260866
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152298240
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -312,32 +345,43 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
-   synchronized (connectLock) {
-   if (!closed) {
-   if (failureCause == null) {
-   failureCause = cause;
-   }
+   private CompletableFuture close(Throwable cause) {
+   CompletableFuture future = new CompletableFuture<>();
+   if (connectionShutdownFuture.compareAndSet(null, 
future)) {
+   synchronized (connectLock) {
+   if (!closed) {
--- End diff --

this seems unnecessary, doesn't the check at L358 guarantee that the entire 
branch is only executed once?


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260870#comment-16260870
 ] 

ASF GitHub Bot commented on FLINK-7880:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152298612
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -386,6 +430,9 @@ private PendingRequest(REQ request) {
/** Reference to a failure that was reported by the channel. */
private final AtomicReference failureCause = new 
AtomicReference<>();
 
+   /** Atomic shut down future. */
+   private final AtomicReference 
connectionShutdownFuture = new AtomicReference<>(null);
--- End diff --

why does this one suddenly return a boolean?


> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...

2017-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152298612
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -386,6 +430,9 @@ private PendingRequest(REQ request) {
/** Reference to a failure that was reported by the channel. */
private final AtomicReference failureCause = new 
AtomicReference<>();
 
+   /** Atomic shut down future. */
+   private final AtomicReference 
connectionShutdownFuture = new AtomicReference<>(null);
--- End diff --

why does this one suddenly return a boolean?


---


[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...

2017-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152298240
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -312,32 +345,43 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
-   synchronized (connectLock) {
-   if (!closed) {
-   if (failureCause == null) {
-   failureCause = cause;
-   }
+   private CompletableFuture close(Throwable cause) {
+   CompletableFuture future = new CompletableFuture<>();
+   if (connectionShutdownFuture.compareAndSet(null, 
future)) {
+   synchronized (connectLock) {
+   if (!closed) {
--- End diff --

this seems unnecessary, doesn't the check at L358 guarantee that the entire 
branch is only executed once?


---


[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...

2017-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152299398
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 ---
@@ -95,15 +97,20 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
 
+   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(20L, TimeUnit.SECONDS);
+
// Thread pool for client bootstrap (shared between tests)
-   private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
+   private NioEventLoopGroup nioGroup;
 
-   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10L, TimeUnit.SECONDS);
+   @Before
+   public void setUp() throws Exception {
+   nioGroup = new NioEventLoopGroup();
--- End diff --

you could just write `private NioEventLoopGroup nioGroup = new 
NioEventLoopGroup();` and remove the `@Before` method


---


[GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...

2017-11-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5038#discussion_r152300724
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,28 +167,57 @@ public String getClientName() {
 * Shuts down the client and closes all connections.
 *
 * After a call to this method, all returned futures will be failed.
+*
+* @return A {@link CompletableFuture} that will be completed when the 
shutdown process is done.
 */
-   public void shutdown() {
-   if (shutDown.compareAndSet(false, true)) {
+   public CompletableFuture shutdown() {
+   final CompletableFuture newShutdownFuture = new 
CompletableFuture<>();
+   if (clientShutdownFuture.compareAndSet(null, 
newShutdownFuture)) {
+
+   final List connectionFutures = 
new ArrayList<>();
+
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   CompletableFuture.allOf(
+   connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])
+   ).whenComplete((result, throwable) -> {
+   if (throwable != null) {
+   
newShutdownFuture.completeExceptionally(throwable);
+   } else if (bootstrap != null) {
+   EventLoopGroup group = 
bootstrap.group();
+   if (group != null && 
!group.isShutdown()) {
+   group.shutdownGracefully(0L, 
0L, TimeUnit.MILLISECONDS)
+   
.addListener(finished -> {
+   if 
(finished.isSuccess()) {
+   
newShutdownFuture.complete(null);
+   } else {
+   
newShutdownFuture.completeExceptionally(finished.cause());
+   }
+   });
+   } else {
+   
newShutdownFuture.complete(null);
+   }
+   } else {
+   newShutdownFuture.complete(null);
}
+   });
+
+   // check again if in the meantime another thread 
completed the future
+   if (clientShutdownFuture.compareAndSet(null, 
newShutdownFuture)) {
--- End diff --

where in close() do we set the shutdown future to null? I only see that 
being done in sendRequest. (which seems fishy)


---


  1   2   >