[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183603#comment-16183603
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141517953
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Great! That makes things a lot easier :-)


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141517953
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Great! That makes things a lot easier :-)


---


[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183586#comment-16183586
 ] 

ASF GitHub Bot commented on FLINK-7661:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4698
  
@NicoK , it already passed travis tests this time.


> Add credit field in PartitionRequest message
> 
>
> Key: FLINK-7661
> URL: https://issues.apache.org/jira/browse/FLINK-7661
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: zhijiang
>Assignee: zhijiang
>
> Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | 
> {{queueIndex}} | {{InputChannelID}} fields.
> We will add a new {{credit}} field indicating the initial credit of 
> {{InputChannel}}, and this info can be got from {{InputChannel}} directly 
> after assigning exclusive buffers to it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4698: [FLINK-7661][network] Add credit field in PartitionReques...

2017-09-27 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4698
  
@NicoK , it already passed travis tests this time.


---


[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website

2017-09-27 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7686:
--
Labels:   (was: comm)

> Add Flink Forward Berlin 2017 conference slides to the flink website
> 
>
> Key: FLINK-7686
> URL: https://issues.apache.org/jira/browse/FLINK-7686
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Hai Zhou UTC+8
>Priority: Trivial
>
> I recently watched [Flink Forward Berlin 
> 2017|https://berlin.flink-forward.org/sessions/] conference slides, the 
> content is very good.  
> I think we should add them to the [flink 
> website|http://flink.apache.org/community.html] for more people to know.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website

2017-09-27 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7686:
--
Labels: comm  (was: site)

> Add Flink Forward Berlin 2017 conference slides to the flink website
> 
>
> Key: FLINK-7686
> URL: https://issues.apache.org/jira/browse/FLINK-7686
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Hai Zhou UTC+8
>Priority: Trivial
>
> I recently watched [Flink Forward Berlin 
> 2017|https://berlin.flink-forward.org/sessions/] conference slides, the 
> content is very good.  
> I think we should add them to the [flink 
> website|http://flink.apache.org/community.html] for more people to know.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website

2017-09-27 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7686:
--
Component/s: Project Website

> Add Flink Forward Berlin 2017 conference slides to the flink website
> 
>
> Key: FLINK-7686
> URL: https://issues.apache.org/jira/browse/FLINK-7686
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Hai Zhou UTC+8
>Priority: Trivial
>  Labels: site
>
> I recently watched [Flink Forward Berlin 
> 2017|https://berlin.flink-forward.org/sessions/] conference slides, the 
> content is very good.  
> I think we should add them to the [flink 
> website|http://flink.apache.org/community.html] for more people to know.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website

2017-09-27 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 updated FLINK-7686:
--
Labels: site  (was: )

> Add Flink Forward Berlin 2017 conference slides to the flink website
> 
>
> Key: FLINK-7686
> URL: https://issues.apache.org/jira/browse/FLINK-7686
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Hai Zhou UTC+8
>Priority: Trivial
>  Labels: site
>
> I recently watched [Flink Forward Berlin 
> 2017|https://berlin.flink-forward.org/sessions/] conference slides, the 
> content is very good.  
> I think we should add them to the [flink 
> website|http://flink.apache.org/community.html] for more people to know.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183532#comment-16183532
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
+accumulator.f0.put(value, accumulator.f0.get(value) + 1)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+val iterator = accumulator.f0.iterator
+if (iterator.hasNext) {
+  val map = new util.HashMap[E, Integer]()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+map.put(entry.getKey, entry.getValue)
+  }
+  map
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
--- End diff --

Check with Calcite tests, should return an empty Multiset instead.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
+accumulator.f0.put(value, accumulator.f0.get(value) + 1)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+val iterator = accumulator.f0.iterator
+if (iterator.hasNext) {
+  val map = new util.HashMap[E, Integer]()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+map.put(entry.getKey, entry.getValue)
+  }
+  map
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
--- End diff --

Check with Calcite tests, should return an empty Multiset instead.


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183530#comment-16183530
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

I took a look at Calcite tests for Collect function, null will be ignored.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183531#comment-16183531
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507197
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507197
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
--- End diff --

done


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

I took a look at Calcite tests for Collect function, null will be ignored.


---


[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-27 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183465#comment-16183465
 ] 

Kent Murra commented on FLINK-7657:
---

As an aside, my testing indicates that the "DATE/TIME/TIMESTAMP" functions in 
the SQL language assume UTC time zone.  is that correct?  If so then it might 
be a good idea to document that in correct places in the API and the online 
documentation (I can attempt to do so if I can find the appropriate places to 
edit, which I haven't so-far).

In particular, 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html 
indicates field translations to a set of raw java types that are time-zone 
sensitive (all subclasses of java.util.Date) while not indicating the assumed 
time zone.  For my use case (trying to write something that pushes down 
timestamp predicates into a query against a MySQL database), this can cause 
some significant confusion and require experimentation to work out.

Alternatively, we can mimic what Calcite is doing and have them be 
specially-typed strings (Calcite uses TimeString, TimestampString, DateString). 
 Since Java 7 is still supported we unfortunately can't use the new time API.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>   at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>   at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   

[jira] [Closed] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-27 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-7635.
---
Resolution: Done

> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183444#comment-16183444
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141494139
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
--- End diff --

normal scala class still need to, but case class dont need to.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183445#comment-16183445
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141494189
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141494189
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
--- End diff --

done


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141494139
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
--- End diff --

normal scala class still need to, but case class dont need to.


---


[jira] [Commented] (FLINK-7724) add extra metrics to MetricStoreTest.setupStore

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183428#comment-16183428
 ] 

ASF GitHub Bot commented on FLINK-7724:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4739
  
cc @tillrohrmann  @zentol 


> add extra metrics to MetricStoreTest.setupStore
> ---
>
> Key: FLINK-7724
> URL: https://issues.apache.org/jira/browse/FLINK-7724
> Project: Flink
>  Issue Type: Test
>  Components: REST, Tests
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> Currently, {{MetricStoreTest.setupStore()}} only mock jobs with only one 
> metrics. We need to have at least one job with multiple metrics to make the 
> mocked MetricStore more generic.
> This is a pre-requisite for FLINK-7694



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6703) Document how to take a savepoint on YARN

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183427#comment-16183427
 ] 

ASF GitHub Bot commented on FLINK-6703:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4721
  
cc @zentol 


> Document how to take a savepoint on YARN
> 
>
> Key: FLINK-6703
> URL: https://issues.apache.org/jira/browse/FLINK-6703
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Bowen Li
>
> The documentation should have a separate entry for savepoint related CLI 
> commands in combination with YARN. It is currently not documented that you 
> have to supply the application id, nor how you can pass it.
> {code}
> ./bin/flink savepoint  -m yarn-cluster (-yid|-yarnapplicationId) 
> 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4739: [FLINK-7724][REST][Tests][Metrics] add extra metrics to M...

2017-09-27 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4739
  
cc @tillrohrmann  @zentol 


---


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183426#comment-16183426
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141492673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * A job related REST handler always requires a {@link 
JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
--- End diff --

ok, got you


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4721: [FLINK-6703][savepoint/doc] Document how to take a savepo...

2017-09-27 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4721
  
cc @zentol 


---


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141492673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * A job related REST handler always requires a {@link 
JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
--- End diff --

ok, got you


---


[jira] [Commented] (FLINK-7724) add extra metrics to MetricStoreTest.setupStore

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183414#comment-16183414
 ] 

ASF GitHub Bot commented on FLINK-7724:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4739

[FLINK-7724][REST][Tests][Metrics] add extra metrics to 
MetricStoreTest.setupStore

## What is the purpose of the change

This is a pre-requisite for FLINK-7694

I'm porting `JobMetricsHandler` to new REST endpoint. Part of the migration 
is to make sure the json generated before and after stay the same for UI to 
consume, and `JobMetricsHandler` is using `MetricStoreTest.setupStore()` to get 
a mocked `MetricStore`. 

Here comes the problem. Currently, `MetricStoreTest.setupStore()` mocks all 
jobs with only one metric. We need to have at least one or two jobs with 
multiple metrics to make the mocked `MetricStore` can cover most use cases, and 
ensure the Json blob generated in `JobMetricsHandler` stay the same before and 
after migration.

## Brief change log

- make two jobs in `MetricStoreTest.setupStore()` contain multiple metrics

## Verifying this change

This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7724

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4739.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4739


commit bc4aa8c3d9b057a6d35b620eb8e00048137e9361
Author: Bowen Li 
Date:   2017-09-27T22:30:02Z

[FLINK-7724] add extra metrics to MetricStoreTest.setupStore

commit d223444eba17c620e8c45c372a7b2e6fafdfd169
Author: Bowen Li 
Date:   2017-09-27T22:39:17Z

delete unrelated files




> add extra metrics to MetricStoreTest.setupStore
> ---
>
> Key: FLINK-7724
> URL: https://issues.apache.org/jira/browse/FLINK-7724
> Project: Flink
>  Issue Type: Test
>  Components: REST, Tests
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> Currently, {{MetricStoreTest.setupStore()}} only mock jobs with only one 
> metrics. We need to have at least one job with multiple metrics to make the 
> mocked MetricStore more generic.
> This is a pre-requisite for FLINK-7694



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7649) Port JobStoppingHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183415#comment-16183415
 ] 

ASF GitHub Bot commented on FLINK-7649:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4700


> Port JobStoppingHandler to new REST endpoint
> 
>
> Key: FLINK-7649
> URL: https://issues.apache.org/jira/browse/FLINK-7649
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobStoppingHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4700: [FLINK-7649] [flip6] Extend JobTerminationHandler ...

2017-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4700


---


[GitHub] flink pull request #4739: [FLINK-7724][REST][Tests][Metrics] add extra metri...

2017-09-27 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4739

[FLINK-7724][REST][Tests][Metrics] add extra metrics to 
MetricStoreTest.setupStore

## What is the purpose of the change

This is a pre-requisite for FLINK-7694

I'm porting `JobMetricsHandler` to new REST endpoint. Part of the migration 
is to make sure the json generated before and after stay the same for UI to 
consume, and `JobMetricsHandler` is using `MetricStoreTest.setupStore()` to get 
a mocked `MetricStore`. 

Here comes the problem. Currently, `MetricStoreTest.setupStore()` mocks all 
jobs with only one metric. We need to have at least one or two jobs with 
multiple metrics to make the mocked `MetricStore` can cover most use cases, and 
ensure the Json blob generated in `JobMetricsHandler` stay the same before and 
after migration.

## Brief change log

- make two jobs in `MetricStoreTest.setupStore()` contain multiple metrics

## Verifying this change

This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7724

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4739.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4739


commit bc4aa8c3d9b057a6d35b620eb8e00048137e9361
Author: Bowen Li 
Date:   2017-09-27T22:30:02Z

[FLINK-7724] add extra metrics to MetricStoreTest.setupStore

commit d223444eba17c620e8c45c372a7b2e6fafdfd169
Author: Bowen Li 
Date:   2017-09-27T22:39:17Z

delete unrelated files




---


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183413#comment-16183413
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4697


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7649) Port JobStoppingHandler to new REST endpoint

2017-09-27 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7649.
--
Resolution: Fixed

Fixed via adeab64ea0abdc892e51c6f630aa56dabf7e2c98

> Port JobStoppingHandler to new REST endpoint
> 
>
> Key: FLINK-7649
> URL: https://issues.apache.org/jira/browse/FLINK-7649
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobStoppingHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4697


---


[jira] [Resolved] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-27 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7650.
--
Resolution: Fixed

Fixed via 8ea4db1a8b368b4e00dd310c0d07405fd2142b34

> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7723) DispatcherTest instable

2017-09-27 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7723.

Resolution: Fixed

Fixed via 4debc6033cc2099b763d7fef86829b9f6734d91a

> DispatcherTest instable
> ---
>
> Key: FLINK-7723
> URL: https://issues.apache.org/jira/browse/FLINK-7723
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{DispatcherTest}} is instable on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/280508821



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183381#comment-16183381
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141485981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
+  if (other.getKind == SqlKind.COLLECT) {
+aggregates(index) = sqlTypeName match {
+  case TINYINT =>
+new ByteCollectAggFunction
+  case SMALLINT =>
+new ShortCollectAggFunction
+  case INTEGER =>
+new IntCollectAggFunction
+  case BIGINT =>
+new LongCollectAggFunction
+  case VARCHAR | CHAR =>
+new StringCollectAggFunction
+  case FLOAT =>
+new FloatCollectAggFunction
+  case DOUBLE =>
+new DoubleCollectAggFunction
+  case _ =>
+new ObjectCollectAggFunction
+}
+  } else {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183380#comment-16183380
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141485942
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141485942
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
--- End diff --

done


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141485981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
+  if (other.getKind == SqlKind.COLLECT) {
+aggregates(index) = sqlTypeName match {
+  case TINYINT =>
+new ByteCollectAggFunction
+  case SMALLINT =>
+new ShortCollectAggFunction
+  case INTEGER =>
+new IntCollectAggFunction
+  case BIGINT =>
+new LongCollectAggFunction
+  case VARCHAR | CHAR =>
+new StringCollectAggFunction
+  case FLOAT =>
+new FloatCollectAggFunction
+  case DOUBLE =>
+new DoubleCollectAggFunction
+  case _ =>
+new ObjectCollectAggFunction
+}
+  } else {
--- End diff --

done


---


[jira] [Created] (FLINK-7724) add extra metrics to MetricStoreTest.setupStore

2017-09-27 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7724:
---

 Summary: add extra metrics to MetricStoreTest.setupStore
 Key: FLINK-7724
 URL: https://issues.apache.org/jira/browse/FLINK-7724
 Project: Flink
  Issue Type: Test
  Components: REST, Tests
Affects Versions: 1.4.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.4.0


Currently, {{MetricStoreTest.setupStore()}} only mock jobs with only one 
metrics. We need to have at least one job with multiple metrics to make the 
mocked MetricStore more generic.

This is a pre-requisite for FLINK-7694



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7722) MiniCluster does not appear to honor Log4j settings

2017-09-27 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy closed FLINK-7722.
-
Resolution: Not A Problem

> MiniCluster does not appear to honor Log4j settings
> ---
>
> Key: FLINK-7722
> URL: https://issues.apache.org/jira/browse/FLINK-7722
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>Priority: Minor
>
> When executing a job from the command line for testing, it will output logs 
> like:
> {noformat}
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] 
> with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014.
> 09/27/2017 13:15:13   Job execution switched to status RUNNING.
> 09/27/2017 13:15:13   Source: Custom File Source(1/1) switched to SCHEDULED 
> 09/27/2017 13:15:13   Source: Collect
> {noformat}
> It will do so even if the log4j.properties file contains:
> {code}
> log4j.rootLogger=ERROR, stdout
> log4j.logger.org.apache.flink=ERROR
> log4j.logger.akka=ERROR
> log4j.logger.org.apache.kafka=ERROR
> log4j.logger.org.apache.hadoop=ERROR
> log4j.logger.org.apache.zookeeper=ERROR
> {code}
> It seems that the MiniCluster does not honor Log4j settings, or at least that 
> is my guess.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7722) MiniCluster does not appear to honor Log4j settings

2017-09-27 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183336#comment-16183336
 ] 

Elias Levy commented on FLINK-7722:
---

Aye.  I figured out the issue was that I needed to call 
{{env.getConfig().disableSysoutLogging()}} by tracing the code.  Closing.

> MiniCluster does not appear to honor Log4j settings
> ---
>
> Key: FLINK-7722
> URL: https://issues.apache.org/jira/browse/FLINK-7722
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>Priority: Minor
>
> When executing a job from the command line for testing, it will output logs 
> like:
> {noformat}
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] 
> with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014.
> 09/27/2017 13:15:13   Job execution switched to status RUNNING.
> 09/27/2017 13:15:13   Source: Custom File Source(1/1) switched to SCHEDULED 
> 09/27/2017 13:15:13   Source: Collect
> {noformat}
> It will do so even if the log4j.properties file contains:
> {code}
> log4j.rootLogger=ERROR, stdout
> log4j.logger.org.apache.flink=ERROR
> log4j.logger.akka=ERROR
> log4j.logger.org.apache.kafka=ERROR
> log4j.logger.org.apache.hadoop=ERROR
> log4j.logger.org.apache.zookeeper=ERROR
> {code}
> It seems that the MiniCluster does not honor Log4j settings, or at least that 
> is my guess.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7723) DispatcherTest instable

2017-09-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7723:


 Summary: DispatcherTest instable
 Key: FLINK-7723
 URL: https://issues.apache.org/jira/browse/FLINK-7723
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Critical


The {{DispatcherTest}} is instable on Travis.

https://travis-ci.org/tillrohrmann/flink/jobs/280508821



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster

2017-09-27 Thread Mikhail Pryakhin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183304#comment-16183304
 ] 

Mikhail Pryakhin commented on FLINK-6949:
-

I propose to add a --yarnship-files CLI option that will submit folders with 
files to YARN cluster and add these folders to the application classpath. 
At the moment the --yarnship option 
[traverses|https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:]
 the content of the specified yarnship folder and appends files to the 
classpath. In case a ship folder contains non *.class, *.jar or  *.zip files, 
but for example config/property files this takes no effect if such files are 
appended to the application classpath. So it would be great to have an ability 
to add the whole folders to the application classpath.
I'm going to add a PR to support this feature.

> Add ability to ship custom resource files to YARN cluster
> -
>
> Key: FLINK-6949
> URL: https://issues.apache.org/jira/browse/FLINK-6949
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, YARN
>Affects Versions: 1.3.0
>Reporter: Mikhail Pryakhin
>Assignee: Mikhail Pryakhin
>Priority: Critical
>
> *The problem:*
> When deploying a flink job on YARN it is not possible to specify custom 
> resource files to be shipped to YARN cluster.
>  
> *The use case description:*
> When running a flink job on multiple environments it becomes necessary to 
> pass environment-related configuration files to the job's runtime. It can be 
> accomplished by packaging configuration files within the job's jar. But 
> having tens of different environments one can easily end up packaging as many 
> jars as there are environments. It would be great to have an ability to 
> separate configuration files from the job artifacts. 
>  
> *The possible solution:*
> add the --yarnship-files option to flink cli to specify files that should be 
> shipped to the YARN cluster.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6949) Add ability to ship custom resource files to YARN cluster

2017-09-27 Thread Mikhail Pryakhin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Pryakhin reassigned FLINK-6949:
---

Assignee: Mikhail Pryakhin

> Add ability to ship custom resource files to YARN cluster
> -
>
> Key: FLINK-6949
> URL: https://issues.apache.org/jira/browse/FLINK-6949
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, YARN
>Affects Versions: 1.3.0
>Reporter: Mikhail Pryakhin
>Assignee: Mikhail Pryakhin
>Priority: Critical
>
> *The problem:*
> When deploying a flink job on YARN it is not possible to specify custom 
> resource files to be shipped to YARN cluster.
>  
> *The use case description:*
> When running a flink job on multiple environments it becomes necessary to 
> pass environment-related configuration files to the job's runtime. It can be 
> accomplished by packaging configuration files within the job's jar. But 
> having tens of different environments one can easily end up packaging as many 
> jars as there are environments. It would be great to have an ability to 
> separate configuration files from the job artifacts. 
>  
> *The possible solution:*
> add the --yarnship-files option to flink cli to specify files that should be 
> shipped to the YARN cluster.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

2017-09-27 Thread Kent Murra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183241#comment-16183241
 ] 

Kent Murra commented on FLINK-7657:
---

Thanks for the information Timo.  My approach was to add an apply method that 
takes in the RexLiteral to the object as follows:

{code}
object Literal {
  ...

  private[flink] def apply(rexNode: RexLiteral): Literal = {
val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType)

val literalValue = literalType match {
  case _...@sqltimetypeinfo.date =>
val rexValue = rexNode.getValueAs(classOf[DateString])
new Date(rexValue.getMillisSinceEpoch)
  case _...@sqltimetypeinfo.time =>
val rexValue = rexNode.getValueAs(classOf[TimeString])
new Time(rexValue.getMillisOfDay)
  case _@SqlTimeTypeInfo.TIMESTAMP =>
// We're losing nanosecond precision but according to the documentation 
we're only
// supporting TIMESTAMP(3) at the moment.  In order to support 
nanosecond precision, we'd want to
// convert to string and then to java.sql.Timestamp
val rexValue = rexNode.getValueAs(classOf[TimestampString])
new Timestamp(rexValue.getMillisSinceEpoch)
  case _ =>rexNode.getValue
}

Literal(literalValue, literalType)
  }
}
{code}

My view: We want to keep the transformation logic to and from the RexNode in a 
similar file so its more obvious that changes need to be "paired".  This 
doesn't match your suggestion, but I think that you were warning me away from 
touching the case class Literal since that appears to be fairly widely used, 
and impact is hard to gauge.

Sorry for the delay in getting a PR out since I got randomized.  I'm looking at 
this again, and I'll get something out and let you guys tell me what you do or 
don't like about it.  For now I'll focus on the Date/Time/Timestamp aspect and 
can follow up with the other types as necessary.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> --
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Kent Murra
>Assignee: Kent Murra
>Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>   at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>   at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>   at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: 

[jira] [Commented] (FLINK-7722) MiniCluster does not appear to honor Log4j settings

2017-09-27 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183221#comment-16183221
 ] 

Ufuk Celebi commented on FLINK-7722:


I think this is the output of the client that is actually submitting the job to 
the {{FlinkMiniCluster}}. I also find this confusing/annoying at times.

You can disable it via {{env.getConfig().disableSysoutLogging()}}.


> MiniCluster does not appear to honor Log4j settings
> ---
>
> Key: FLINK-7722
> URL: https://issues.apache.org/jira/browse/FLINK-7722
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>Priority: Minor
>
> When executing a job from the command line for testing, it will output logs 
> like:
> {noformat}
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] 
> with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014.
> 09/27/2017 13:15:13   Job execution switched to status RUNNING.
> 09/27/2017 13:15:13   Source: Custom File Source(1/1) switched to SCHEDULED 
> 09/27/2017 13:15:13   Source: Collect
> {noformat}
> It will do so even if the log4j.properties file contains:
> {code}
> log4j.rootLogger=ERROR, stdout
> log4j.logger.org.apache.flink=ERROR
> log4j.logger.akka=ERROR
> log4j.logger.org.apache.kafka=ERROR
> log4j.logger.org.apache.hadoop=ERROR
> log4j.logger.org.apache.zookeeper=ERROR
> {code}
> It seems that the MiniCluster does not honor Log4j settings, or at least that 
> is my guess.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7722) MiniCluster does not appear to honor Log4j settings

2017-09-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7722:
-

 Summary: MiniCluster does not appear to honor Log4j settings
 Key: FLINK-7722
 URL: https://issues.apache.org/jira/browse/FLINK-7722
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


When executing a job from the command line for testing, it will output logs 
like:

{noformat}
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] 
with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014.
09/27/2017 13:15:13 Job execution switched to status RUNNING.
09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED 
09/27/2017 13:15:13 Source: Collect
{noformat}

It will do so even if the log4j.properties file contains:

{code}
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.flink=ERROR
log4j.logger.akka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.hadoop=ERROR
log4j.logger.org.apache.zookeeper=ERROR
{code}

It seems that the MiniCluster does not honor Log4j settings, or at least that 
is my guess.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7720) Centralize creation of backends and state related resources

2017-09-27 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183151#comment-16183151
 ] 

Bowen Li commented on FLINK-7720:
-

+1

> Centralize creation of backends and state related resources
> ---
>
> Key: FLINK-7720
> URL: https://issues.apache.org/jira/browse/FLINK-7720
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> State related resources like keyed state backends, operator state backends, 
> timer service manager, etc. are currently all created in different places, 
> somewhere between {{StreamTask}} and {{AbstractStreamOperator}}. It is hard 
> to identify where/how resources are created, which component owns them, and 
> how certain components depend on others (e.g. there is only a timer service 
> manager if there is also a keyed state backend).
> For the changes we plan for local state recovery, it would make sense to 
> define a component that is responsible for the creation of state related 
> resources and gives the possibility to interact in checkpointing and 
> restoring with the checkpoint coordinator (owned by job manager) as well as 
> local state storage (owned by task manager).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183148#comment-16183148
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141449807
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
--- End diff --

Good catch. done.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-27 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141449807
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
--- End diff --

Good catch. done.


---


[jira] [Assigned] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN

2017-09-27 Thread Mikhail Pryakhin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Pryakhin reassigned FLINK-6950:
---

Assignee: Mikhail Pryakhin

> Add ability to specify single jar files to be shipped to YARN
> -
>
> Key: FLINK-6950
> URL: https://issues.apache.org/jira/browse/FLINK-6950
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Mikhail Pryakhin
>Assignee: Mikhail Pryakhin
>Priority: Minor
>
> When deploying a flink job on YARN it is not possible to specify multiple 
> yarnship folders. 
> Often when submitting a flink job, job dependencies and job resources are 
> located in different local folders and both of them should be shipped to YARN 
> cluster. 
> I think it would be great to have an ability to specify jars but not folders 
> that should be shipped to YARN cluster (via the --yarnship-jars option). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN

2017-09-27 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183104#comment-16183104
 ] 

Robert Metzger commented on FLINK-6950:
---

Yes, please go ahead!

> Add ability to specify single jar files to be shipped to YARN
> -
>
> Key: FLINK-6950
> URL: https://issues.apache.org/jira/browse/FLINK-6950
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Mikhail Pryakhin
>Priority: Minor
>
> When deploying a flink job on YARN it is not possible to specify multiple 
> yarnship folders. 
> Often when submitting a flink job, job dependencies and job resources are 
> located in different local folders and both of them should be shipped to YARN 
> cluster. 
> I think it would be great to have an ability to specify jars but not folders 
> that should be shipped to YARN cluster (via the --yarnship-jars option). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183016#comment-16183016
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4697
  
+1 from my side.


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183014#comment-16183014
 ] 

ASF GitHub Bot commented on FLINK-7721:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4738

[FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated 
from watermark-aligned inputs

## What is the purpose of the change

Prior to this PR, in the calculation of the new min watermark in 
`StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()`, 
there is no verification that the calculated new min watermark really is 
aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned but 
actually some are active, we would then incorrectly determine that the final 
aggregation is `Long.MAX_VALUE` and emit that.

This PR fixes this by only emitting the aggregated watermark iff it was 
really calculated from some aligned input channel (as well as the already 
existing constraint that it needs to be larger than the last emitted 
watermark). This change should also safely cover the case that a 
`Long.MAX_VALUE` was genuinely aggregated from the input channels.

**This is a critical bug that should be fixed for all affecting versions: 
1.2.1, 1.3.2, 1.4.0 (master)**

## Brief change log

- Fix `findAndOutputNewMinWatermarkAcrossAlignedChannels` to only output 
the new min watermark iff it was truly computed from some aligned channel.
- Adapt `StatusWatermarkValveTest#testMultipleInputValve()` to cover the 
missing case.

## Verifying this change

Verified by the modified 
`StatusWatermarkValveTest#testMultipleInputValve()`.
Without the fix in `findAndOutputNewMinWatermarkAcrossAlignedChannels`, the 
modified test does not pass.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): **YES**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? no



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7721

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4738.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4738


commit 29d3b74f81410ea9c2ed861c297ece8134d9b29f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-27T18:05:21Z

[FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated 
from watermark-aligned inputs

Prior to this commit, In the calculation of the new min watermark in
StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(),
there is no verification that the calculated new min watermark
really is aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned
but actually some are active, we would then incorrectly determine that
the final aggregation is Long.MAX_VALUE and emit that.

This commit fixes this by only emitting the aggregated watermark iff it was
really calculated from some aligned input channel (as well as the
already existing constraint that it needs to be larger than the last
emitted watermark). This change should also safely cover the case that a
Long.MAX_VALUE was genuinely aggregated from the input channels.




> StatusWatermarkValve should output a new min watermark only if it was 
> aggregated from aligned chhanels
> --
>
> Key: FLINK-7721
> URL: https://issues.apache.org/jira/browse/FLINK-7721
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.2, 1.4.0, 1.3.3
>
>
> Context:
> {code}
> long newMinWatermark = Long.MAX_VALUE;
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> {code}
> In the calculation of the new min 

[GitHub] flink issue #4697: [FLINK-7650] [flip6] Port JobCancellationHandler to new R...

2017-09-27 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4697
  
+1 from my side.


---


[GitHub] flink pull request #4738: [FLINK-7721] [DataStream] Only emit new min waterm...

2017-09-27 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4738

[FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated 
from watermark-aligned inputs

## What is the purpose of the change

Prior to this PR, in the calculation of the new min watermark in 
`StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()`, 
there is no verification that the calculated new min watermark really is 
aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned but 
actually some are active, we would then incorrectly determine that the final 
aggregation is `Long.MAX_VALUE` and emit that.

This PR fixes this by only emitting the aggregated watermark iff it was 
really calculated from some aligned input channel (as well as the already 
existing constraint that it needs to be larger than the last emitted 
watermark). This change should also safely cover the case that a 
`Long.MAX_VALUE` was genuinely aggregated from the input channels.

**This is a critical bug that should be fixed for all affecting versions: 
1.2.1, 1.3.2, 1.4.0 (master)**

## Brief change log

- Fix `findAndOutputNewMinWatermarkAcrossAlignedChannels` to only output 
the new min watermark iff it was truly computed from some aligned channel.
- Adapt `StatusWatermarkValveTest#testMultipleInputValve()` to cover the 
missing case.

## Verifying this change

Verified by the modified 
`StatusWatermarkValveTest#testMultipleInputValve()`.
Without the fix in `findAndOutputNewMinWatermarkAcrossAlignedChannels`, the 
modified test does not pass.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): **YES**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? no



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7721

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4738.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4738


commit 29d3b74f81410ea9c2ed861c297ece8134d9b29f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-09-27T18:05:21Z

[FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated 
from watermark-aligned inputs

Prior to this commit, In the calculation of the new min watermark in
StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(),
there is no verification that the calculated new min watermark
really is aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned
but actually some are active, we would then incorrectly determine that
the final aggregation is Long.MAX_VALUE and emit that.

This commit fixes this by only emitting the aggregated watermark iff it was
really calculated from some aligned input channel (as well as the
already existing constraint that it needs to be larger than the last
emitted watermark). This change should also safely cover the case that a
Long.MAX_VALUE was genuinely aggregated from the input channels.




---


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183011#comment-16183011
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141425581
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * A job related REST handler always requires a {@link 
JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
--- End diff --

This class is meant to be a general parameters class for whatever 
parameters are generally required for job-related queries. While it currently 
only contains the job ID i think it makes sense to leave it as is.


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-27 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141425581
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * A job related REST handler always requires a {@link 
JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
--- End diff --

This class is meant to be a general parameters class for whatever 
parameters are generally required for job-related queries. While it currently 
only contains the job ID i think it makes sense to leave it as is.


---


[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183002#comment-16183002
 ] 

ASF GitHub Bot commented on FLINK-5944:
---

Github user mlipkovich commented on a diff in the pull request:

https://github.com/apache/flink/pull/4683#discussion_r141424281
  
--- Diff: flink-core/pom.xml ---
@@ -52,6 +52,12 @@ under the License.
flink-shaded-asm

 
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   
--- End diff --

Thanks for your comment Aljoscha,
So there are at least three ways on how to achieve it: either mark this 
dependency as 'provided', move Hadoop Snappy Codec related classes to 
flink-java module or move it to some separate module as suggested @haohui, but 
I'm not sure what should be inside this module


> Flink should support reading Snappy Files
> -
>
> Key: FLINK-5944
> URL: https://issues.apache.org/jira/browse/FLINK-5944
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ilya Ganelin
>Assignee: Mikhail Lipkovich
>  Labels: features
>
> Snappy is an extremely performant compression format that's widely used 
> offering fast decompression/compression. 
> This can be easily implemented by creating a SnappyInflaterInputStreamFactory 
> and updating the initDefaultInflateInputStreamFactories in FileInputFormat.
> Flink already includes the Snappy dependency in the project. 
> There is a minor gotcha in this. If we wish to use this with Hadoop, then we 
> must provide two separate implementations since Hadoop uses a different 
> version of the snappy format than Snappy Java (which is the xerial/snappy 
> included in Flink). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4683: [FLINK-5944] Support reading of Snappy files

2017-09-27 Thread mlipkovich
Github user mlipkovich commented on a diff in the pull request:

https://github.com/apache/flink/pull/4683#discussion_r141424281
  
--- Diff: flink-core/pom.xml ---
@@ -52,6 +52,12 @@ under the License.
flink-shaded-asm

 
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   
--- End diff --

Thanks for your comment Aljoscha,
So there are at least three ways on how to achieve it: either mark this 
dependency as 'provided', move Hadoop Snappy Codec related classes to 
flink-java module or move it to some separate module as suggested @haohui, but 
I'm not sure what should be inside this module


---


[jira] [Created] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels

2017-09-27 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-7721:
--

 Summary: StatusWatermarkValve should output a new min watermark 
only if it was aggregated from aligned chhanels
 Key: FLINK-7721
 URL: https://issues.apache.org/jira/browse/FLINK-7721
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.2, 1.2.1, 1.4.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Priority: Blocker
 Fix For: 1.2.2, 1.4.0, 1.3.3


Context:
{code}
long newMinWatermark = Long.MAX_VALUE;

for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
{code}

In the calculation of the new min watermark in 
{{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, 
there is not verification that the calculated new min watermark 
{{newMinWatermark}} really is aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned but 
actually some are active, we would then incorrectly determine that the final 
aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that.

The fix would simply be to only emit the aggregated watermark IFF it was really 
calculated from some aligned input channel (as well as the already existing 
constraint that it needs to be larger than the last emitted watermark). This 
change should also safely cover the case that a {{Long.MAX_VALUE}} was 
genuinely aggregated from one of the input channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4722: [FLINK-7683] Iterate over keys in KeyedStateBackend

2017-09-27 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4722
  
LGTM!

One more ask: can you please add comments to both the class and the code 
explaining that concurrent modification will fail the iteration?


---


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182944#comment-16182944
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4722
  
LGTM!

One more ask: can you please add comments to both the class and the code 
explaining that concurrent modification will fail the iteration?


> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182930#comment-16182930
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141409233
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * A job related REST handler always requires a {@link 
JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
--- End diff --

sorry i missed this file name yesterday. How about 
`JobIDMessageParameters`? since there're no job query params in this class


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182929#comment-16182929
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141409228
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
--- End diff --

"Parameters for job `ID` related REST handlers"?


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141409228
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
--- End diff --

"Parameters for job `ID` related REST handlers"?


---


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141409233
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for job related REST handlers.
+ *
+ * A job related REST handler always requires a {@link 
JobIDPathParameter}.
+ */
+public class JobMessageParameters extends MessageParameters {
--- End diff --

sorry i missed this file name yesterday. How about 
`JobIDMessageParameters`? since there're no job query params in this class


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182902#comment-16182902
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r141406439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(PermanentBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /** Root directory for local file storage */
+   private final File storageDir;
+
+   /** Blob store for distributed file storage, e.g. in HA */
+   private final BlobView blobView;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the storage directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   // 

+
+   /**
+* Job reference counters with a time-to-live (TTL).
+*/
+   @VisibleForTesting
+   static class RefCount {
+   /**
+* Number of references to a job.
+*/
+   public int references = 0;
+
+   /**
+* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+* non-positive values).
+*/
+   public long keepUntil = 

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r141406439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(PermanentBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /** Root directory for local file storage */
+   private final File storageDir;
+
+   /** Blob store for distributed file storage, e.g. in HA */
+   private final BlobView blobView;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the storage directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   // 

+
+   /**
+* Job reference counters with a time-to-live (TTL).
+*/
+   @VisibleForTesting
+   static class RefCount {
+   /**
+* Number of references to a job.
+*/
+   public int references = 0;
+
+   /**
+* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+* non-positive values).
+*/
+   public long keepUntil = -1;
+   }
+
+   /** Map to store the number of references to a specific job */
+   private final Map jobRefCounters = new HashMap<>();
+
+   /** Time interval (ms) to run the cleanup task; also used 

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182898#comment-16182898
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r141406045
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws 
IOException {
 *
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param requiredBlob
+* @param blobKey
 *  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
 *
 * @return file referring to the local storage location of the BLOB
 *
 * @throws IOException
 *  Thrown if the file retrieval failed.
 */
-   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
-   checkArgument(requiredBlob != null, "BLOB key cannot be null.");
+   private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, 
boolean highlyAvailable) throws IOException {
+   checkArgument(blobKey != null, "BLOB key cannot be null.");
 
-   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, requiredBlob);
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, blobKey);
+   readWriteLock.readLock().lock();
 
-   if (localFile.exists()) {
+   try {
+   getFileInternal(jobId, blobKey, highlyAvailable, 
localFile);
return localFile;
+   } finally {
+   readWriteLock.readLock().unlock();
}
-   else {
+   }
+
+   /**
+* Helper to retrieve the local path of a file associated with a job 
and a blob key.
+* 
+* The blob server looks the blob key up in its local storage. If the 
file exists, it is
+* returned. If the file does not exist, it is retrieved from the HA 
blob store (if available)
+* or a {@link FileNotFoundException} is thrown.
+* 
+* Assumes the read lock has already been acquired.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param blobKey
+*  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
+* @param localFile
+*  (local) file where the blob is/should be stored
+*
+* @throws IOException
+*  Thrown if the file retrieval failed.
+*/
+   void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean 
highlyAvailable, File localFile) throws IOException {
+   // assume readWriteLock.readLock() was already locked (cannot 
really check that)
+
+   if (localFile.exists()) {
+   return;
+   } else if (highlyAvailable) {
+   // Try the HA blob store
+   // first we have to release the read lock in order to 
acquire the write lock
+   readWriteLock.readLock().unlock();
+
+   // use a temporary file (thread-safe without locking)
+   File incomingFile = null;
try {
-   // Try the blob store
-   blobStore.get(jobId, requiredBlob, localFile);
+   incomingFile = createTemporaryFilename();
+   blobStore.get(jobId, blobKey, incomingFile);
+
+   BlobUtils.moveTempFileToStore(
+   incomingFile, jobId, blobKey, 
localFile, readWriteLock.writeLock(), LOG, null);
--- End diff --

Alright.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for 

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r141406045
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws 
IOException {
 *
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param requiredBlob
+* @param blobKey
 *  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
 *
 * @return file referring to the local storage location of the BLOB
 *
 * @throws IOException
 *  Thrown if the file retrieval failed.
 */
-   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
-   checkArgument(requiredBlob != null, "BLOB key cannot be null.");
+   private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, 
boolean highlyAvailable) throws IOException {
+   checkArgument(blobKey != null, "BLOB key cannot be null.");
 
-   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, requiredBlob);
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, blobKey);
+   readWriteLock.readLock().lock();
 
-   if (localFile.exists()) {
+   try {
+   getFileInternal(jobId, blobKey, highlyAvailable, 
localFile);
return localFile;
+   } finally {
+   readWriteLock.readLock().unlock();
}
-   else {
+   }
+
+   /**
+* Helper to retrieve the local path of a file associated with a job 
and a blob key.
+* 
+* The blob server looks the blob key up in its local storage. If the 
file exists, it is
+* returned. If the file does not exist, it is retrieved from the HA 
blob store (if available)
+* or a {@link FileNotFoundException} is thrown.
+* 
+* Assumes the read lock has already been acquired.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param blobKey
+*  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
+* @param localFile
+*  (local) file where the blob is/should be stored
+*
+* @throws IOException
+*  Thrown if the file retrieval failed.
+*/
+   void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean 
highlyAvailable, File localFile) throws IOException {
+   // assume readWriteLock.readLock() was already locked (cannot 
really check that)
+
+   if (localFile.exists()) {
+   return;
+   } else if (highlyAvailable) {
+   // Try the HA blob store
+   // first we have to release the read lock in order to 
acquire the write lock
+   readWriteLock.readLock().unlock();
+
+   // use a temporary file (thread-safe without locking)
+   File incomingFile = null;
try {
-   // Try the blob store
-   blobStore.get(jobId, requiredBlob, localFile);
+   incomingFile = createTemporaryFilename();
+   blobStore.get(jobId, blobKey, incomingFile);
+
+   BlobUtils.moveTempFileToStore(
+   incomingFile, jobId, blobKey, 
localFile, readWriteLock.writeLock(), LOG, null);
--- End diff --

Alright.


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182897#comment-16182897
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r141405909
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws 
IOException {
// start the server thread
setName("BLOB Server listener at " + getPort());
setDaemon(true);
-   start();
--- End diff --

Makes sense.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r141405909
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws 
IOException {
// start the server thread
setName("BLOB Server listener at " + getPort());
setDaemon(true);
-   start();
--- End diff --

Makes sense.


---


[jira] [Commented] (FLINK-7411) minor performance improvements in NettyMessage

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182879#comment-16182879
 ] 

ASF GitHub Bot commented on FLINK-7411:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4517
  
Thanks for your contribution @NicoK and the review @pnowojski. Changes look 
good to me. Will wait for Travis and then merge this PR.


> minor performance improvements in NettyMessage
> --
>
> Key: FLINK-7411
> URL: https://issues.apache.org/jira/browse/FLINK-7411
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> {{NettyMessage}} may be improved slightly performance-wise in these regards:
> - in {{NettyMessage.NettyMessageDecoder#decode()}}: instead of having 
> multiple if-elseif-... use a switch to cycle through the message ID
> - use a static {{NettyMessage}} subtype {{readFrom(ByteBuf buffer)}} - we do 
> not really need to have a virtual function here



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4517: [FLINK-7411][network] minor performance improvements in N...

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4517
  
Thanks for your contribution @NicoK and the review @pnowojski. Changes look 
good to me. Will wait for Travis and then merge this PR.


---


[jira] [Commented] (FLINK-7514) fix BackPressureStatsTrackerITCase releasing buffers twice

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182858#comment-16182858
 ] 

ASF GitHub Bot commented on FLINK-7514:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4591
  
Thanks for your contribution @NicoK and the reviews @pnowojski and @zentol. 
Changes look good. If Travis passes, then I'll merge the PR.


> fix BackPressureStatsTrackerITCase releasing buffers twice
> --
>
> Key: FLINK-7514
> URL: https://issues.apache.org/jira/browse/FLINK-7514
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{BackPressureStatsTrackerITCase#testBackPressuredProducer()}} is releasing 
> its buffers twice which should be fixed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4591: [FLINK-7514][tests] fix BackPressureStatsTrackerITCase re...

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4591
  
Thanks for your contribution @NicoK and the reviews @pnowojski and @zentol. 
Changes look good. If Travis passes, then I'll merge the PR.


---


[jira] [Commented] (FLINK-7513) remove TestBufferFactory#MOCK_BUFFER

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182855#comment-16182855
 ] 

ASF GitHub Bot commented on FLINK-7513:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4590
  
Thanks for your contribution @NicoK and the review @pnowojski. Merging this 
PR. 


> remove TestBufferFactory#MOCK_BUFFER
> 
>
> Key: FLINK-7513
> URL: https://issues.apache.org/jira/browse/FLINK-7513
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{TestBufferFactory#MOCK_BUFFER}} is a static buffer in tests and did not 
> allow proper reference counting and we should rather create test buffers in 
> the tests which may also be released afterwards.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4590: [FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4590
  
Thanks for your contribution @NicoK and the review @pnowojski. Merging this 
PR. 


---


[jira] [Commented] (FLINK-7699) Define the BufferListener interface to replace EventListener in BufferProvider

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182854#comment-16182854
 ] 

ASF GitHub Bot commented on FLINK-7699:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4735#discussion_r141400676
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 ---
@@ -211,10 +210,15 @@ public void testSetLessThanRequiredNumBuffers() 
throws IOException {
 
@Test
public void testPendingRequestWithListenerAfterRecycle() throws 
Exception {
-   EventListener listener = spy(new 
EventListener() {
+   BufferListener listener = spy(new BufferListener() {
@Override
-   public void onEvent(Buffer buffer) {
+   public boolean notifyBufferAvailable(Buffer buffer) {
buffer.recycle();
+   return false;
--- End diff --

Yes, I also consider the issue of verifying notification multi times. 
And I guess you may point out it. :)
It can not work to return true for that, and I will think of proper way of 
verifying it.


> Define the BufferListener interface to replace EventListener in BufferProvider
> --
>
> Key: FLINK-7699
> URL: https://issues.apache.org/jira/browse/FLINK-7699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the {{EventListener}} is used in {{BufferProvider}} to be notified 
> of buffer available or destroyed pool. 
> To be semantic clearly, we define a new {{BufferListener}} interface which 
> can opt for a one-time only notification or to be notified repeatedly. And we 
> can also notify the pool destroyed via explicitly method 
> {{notifyBufferDestroyed}}.
> The {{RemoteInputChannel}} will implement the {{BufferListener}} interface to 
> wait for floating buffers from {{BufferProvider}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

2017-09-27 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4735#discussion_r141400676
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 ---
@@ -211,10 +210,15 @@ public void testSetLessThanRequiredNumBuffers() 
throws IOException {
 
@Test
public void testPendingRequestWithListenerAfterRecycle() throws 
Exception {
-   EventListener listener = spy(new 
EventListener() {
+   BufferListener listener = spy(new BufferListener() {
@Override
-   public void onEvent(Buffer buffer) {
+   public boolean notifyBufferAvailable(Buffer buffer) {
buffer.recycle();
+   return false;
--- End diff --

Yes, I also consider the issue of verifying notification multi times. 
And I guess you may point out it. :)
It can not work to return true for that, and I will think of proper way of 
verifying it.


---


[jira] [Commented] (FLINK-7487) test instability in ClassLoaderITCase (no resources available)

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182850#comment-16182850
 ] 

ASF GitHub Bot commented on FLINK-7487:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4571
  
Thanks for your contribution @NicoK. Merging this PR.


> test instability in ClassLoaderITCase (no resources available)
> --
>
> Key: FLINK-7487
> URL: https://issues.apache.org/jira/browse/FLINK-7487
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 
> which contains quite some changes but the error itself should be unrelated:
> {code}
> testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
>   Time elapsed: 0.604 sec  <<< ERROR!
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>   at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>   at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> 

[GitHub] flink issue #4571: [FLINK-7487][tests] fix ClassLoaderITCase#testDisposeSave...

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4571
  
Thanks for your contribution @NicoK. Merging this PR.


---


[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

2017-09-27 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4735#discussion_r141398906
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -87,6 +88,12 @@
/** The number of available buffers that have not been announced to the 
producer yet. */
private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
+   /** The number of unsent buffers in the producer's sub partition. */
+   private final AtomicInteger senderBacklog = new AtomicInteger(0);
+
+   /** The tag indicates whether this channel is waiting for additional 
floating buffers from the buffer pool. */
+   private final AtomicBoolean isWaitingFloatingBuffers = new 
AtomicBoolean(false);
--- End diff --

These two fields are currently used in `notifyBufferAvailable()` logic, so 
we have to define them in this PR.
In next PR #4509 , `isWaitingForFloatingBuffers` field will be set  true by 
two conditions:
1. The number of current available buffers is less than sender backlog
2. There are no available floating buffers in `BufferProvider`
And this field also used to avoid register listener in `BufferProvider` 
multi times.

It may confuse you to only see this PR change, sorry for that.


---


[jira] [Commented] (FLINK-7699) Define the BufferListener interface to replace EventListener in BufferProvider

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182849#comment-16182849
 ] 

ASF GitHub Bot commented on FLINK-7699:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4735#discussion_r141398906
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -87,6 +88,12 @@
/** The number of available buffers that have not been announced to the 
producer yet. */
private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
+   /** The number of unsent buffers in the producer's sub partition. */
+   private final AtomicInteger senderBacklog = new AtomicInteger(0);
+
+   /** The tag indicates whether this channel is waiting for additional 
floating buffers from the buffer pool. */
+   private final AtomicBoolean isWaitingFloatingBuffers = new 
AtomicBoolean(false);
--- End diff --

These two fields are currently used in `notifyBufferAvailable()` logic, so 
we have to define them in this PR.
In next PR #4509 , `isWaitingForFloatingBuffers` field will be set  true by 
two conditions:
1. The number of current available buffers is less than sender backlog
2. There are no available floating buffers in `BufferProvider`
And this field also used to avoid register listener in `BufferProvider` 
multi times.

It may confuse you to only see this PR change, sorry for that.


> Define the BufferListener interface to replace EventListener in BufferProvider
> --
>
> Key: FLINK-7699
> URL: https://issues.apache.org/jira/browse/FLINK-7699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the {{EventListener}} is used in {{BufferProvider}} to be notified 
> of buffer available or destroyed pool. 
> To be semantic clearly, we define a new {{BufferListener}} interface which 
> can opt for a one-time only notification or to be notified repeatedly. And we 
> can also notify the pool destroyed via explicitly method 
> {{notifyBufferDestroyed}}.
> The {{RemoteInputChannel}} will implement the {{BufferListener}} interface to 
> wait for floating buffers from {{BufferProvider}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182844#comment-16182844
 ] 

ASF GitHub Bot commented on FLINK-7695:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4737

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint

## What is the purpose of the change

Add `JobConfigHandler` for the new `RestServerEndpoint`.

## Verifying this change

This change added tests and can be verified as follows:

- JSON marshalling is tested via `JobConfigInfoTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink portJobConfigHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4737


commit 3aa313561ee238b9074d58024516ebffeafbe3ed
Author: Till Rohrmann 
Date:   2017-09-21T08:53:24Z

[FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint

Let the JobCancellationHandler implement the LegacyRestHandler interface. 
Moreover,
this commit adds the DELETE method to HttpMethodWrapper and the
RestServerEndpoint#registerHandler method.

Add PATCH method

This closes #4697.

commit 1fa13dc89ffebd2b3f286ce4cb9cf4ac5350bc79
Author: Till Rohrmann 
Date:   2017-09-21T14:47:18Z

[FLINK-7649] [flip6] Extend JobTerminationHandler to support stop

Rename the JobCancellationHandler into JobTerminationHandler which is now 
responsible
for terminating jobs. Moreover, this commits adds two termination modes, 
cancel and stop,
which are specified by providing a query parameter.

This closes #4700.

commit 82eeeb95a943ccf71692f0b228027b0163d19def
Author: Till Rohrmann 
Date:   2017-09-22T11:31:12Z

[FLINK-7667] [flip6] Use ArchivedExecutionGraph as serializable 
AccessExecutionGraph

This commit removes AccessExecutionGraph#getCheckpointCoordinator and 
changes the
AccessExecutionGraph#getJobCheckpointSettings into 
#getJobCheckpointConfiguration.
The JobCheckpointConfiguration only contains the CheckpointCoordinator 
relevant
configuration settings and excludes the serialized state backend and the
serialized master hooks. That way we don't send unnecessary information when
the ArchivedExecutionGraph is requested.

This closes #4727.

commit 67ed81f7d1cc7e1e150bc2abd9f037e75a05b8ff
Author: Till Rohrmann 
Date:   2017-09-25T13:29:59Z

[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the 
latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. 
Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache 
entries after
a given time to live period. This will trigger requesting the 
AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph 
based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic 
cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache 
entries which
have exceeded their time to live. Currently it is set to 20 * 
refreshInterval of the
web gui.

This closes #4728.

commit 6409de0441d1fd2d3321d6d0eb05db51d91027ea
Author: Till Rohrmann 
Date:   2017-09-26T16:39:15Z

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint




> Port JobConfigHandler to new REST endpoint
> --
>
> Key: FLINK-7695
> URL: https://issues.apache.org/jira/browse/FLINK-7695
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  

[GitHub] flink pull request #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new ...

2017-09-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4737

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint

## What is the purpose of the change

Add `JobConfigHandler` for the new `RestServerEndpoint`.

## Verifying this change

This change added tests and can be verified as follows:

- JSON marshalling is tested via `JobConfigInfoTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink portJobConfigHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4737


commit 3aa313561ee238b9074d58024516ebffeafbe3ed
Author: Till Rohrmann 
Date:   2017-09-21T08:53:24Z

[FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint

Let the JobCancellationHandler implement the LegacyRestHandler interface. 
Moreover,
this commit adds the DELETE method to HttpMethodWrapper and the
RestServerEndpoint#registerHandler method.

Add PATCH method

This closes #4697.

commit 1fa13dc89ffebd2b3f286ce4cb9cf4ac5350bc79
Author: Till Rohrmann 
Date:   2017-09-21T14:47:18Z

[FLINK-7649] [flip6] Extend JobTerminationHandler to support stop

Rename the JobCancellationHandler into JobTerminationHandler which is now 
responsible
for terminating jobs. Moreover, this commits adds two termination modes, 
cancel and stop,
which are specified by providing a query parameter.

This closes #4700.

commit 82eeeb95a943ccf71692f0b228027b0163d19def
Author: Till Rohrmann 
Date:   2017-09-22T11:31:12Z

[FLINK-7667] [flip6] Use ArchivedExecutionGraph as serializable 
AccessExecutionGraph

This commit removes AccessExecutionGraph#getCheckpointCoordinator and 
changes the
AccessExecutionGraph#getJobCheckpointSettings into 
#getJobCheckpointConfiguration.
The JobCheckpointConfiguration only contains the CheckpointCoordinator 
relevant
configuration settings and excludes the serialized state backend and the
serialized master hooks. That way we don't send unnecessary information when
the ArchivedExecutionGraph is requested.

This closes #4727.

commit 67ed81f7d1cc7e1e150bc2abd9f037e75a05b8ff
Author: Till Rohrmann 
Date:   2017-09-25T13:29:59Z

[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the 
latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. 
Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache 
entries after
a given time to live period. This will trigger requesting the 
AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph 
based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic 
cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache 
entries which
have exceeded their time to live. Currently it is set to 20 * 
refreshInterval of the
web gui.

This closes #4728.

commit 6409de0441d1fd2d3321d6d0eb05db51d91027ea
Author: Till Rohrmann 
Date:   2017-09-26T16:39:15Z

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint




---


[GitHub] flink issue #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribute to wo...

2017-09-27 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4710
  
Looks good to me.

+1


---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182840#comment-16182840
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4710
  
Looks good to me.

+1


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141396863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
+
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
+
+   private final AtomicReference channelError = new 
AtomicReference<>();
+
+   /**
+* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* while data is still coming in for this channel.
+*/
+   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
+
+   private volatile ChannelHandlerContext ctx;
+
+   // 

+   // Input channel/receiver registration
+   // 

+
+   void addInputChannel(RemoteInputChannel listener) throws IOException {
+   checkError();
+
+   if (!inputChannels.containsKey(listener.getInputChannelId())) {
+   inputChannels.put(listener.getInputChannelId(), 
listener);
+   }
+   }
+
+   void removeInputChannel(RemoteInputChannel listener) {
+   inputChannels.remove(listener.getInputChannelId());
+   }
+
+   void cancelRequestFor(InputChannelID inputChannelId) {
+   if (inputChannelId == null || ctx == null) {
+   return;
+   }
+
+   if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == 
null) {
+   ctx.writeAndFlush(new 
NettyMessage.CancelPartitionRequest(inputChannelId));
+   }
+   }
+
+   // 

+   // Network events
+   // 

+
+   @Override
+   public void channelActive(final ChannelHandlerContext ctx) throws 
Exception {
+   if (this.ctx == null) {
+   this.ctx = ctx;
+   }
+
+   super.channelActive(ctx);
+   }
+
+   @Override
+   public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
+

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182835#comment-16182835
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141396863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
+
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
+
+   private final AtomicReference channelError = new 
AtomicReference<>();
+
+   /**
+* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* while data is still coming in for this channel.
+*/
+   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
+
+   private volatile ChannelHandlerContext ctx;
+
+   // 

+   // Input channel/receiver registration
+   // 

+
+   void addInputChannel(RemoteInputChannel listener) throws IOException {
+   checkError();
+
+   if (!inputChannels.containsKey(listener.getInputChannelId())) {
+   inputChannels.put(listener.getInputChannelId(), 
listener);
+   }
+   }
+
+   void removeInputChannel(RemoteInputChannel listener) {
+   inputChannels.remove(listener.getInputChannelId());
+   }
+
+   void cancelRequestFor(InputChannelID inputChannelId) {
+   if (inputChannelId == null || ctx == null) {
+   return;
+   }
+
+   if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == 
null) {
+   ctx.writeAndFlush(new 
NettyMessage.CancelPartitionRequest(inputChannelId));
+   }
+   }
+
+   // 

+   // Network events
+   // 

+
+   @Override
+   public void channelActive(final ChannelHandlerContext ctx) throws 
Exception {
+   if (this.ctx == 

[jira] [Commented] (FLINK-7667) Add serializable AccessExecutionGraph implementation

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182831#comment-16182831
 ] 

ASF GitHub Bot commented on FLINK-7667:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4727
  
Thanks for the review @zentol. Rebasing this commit and waiting on Travis.


> Add serializable AccessExecutionGraph implementation
> 
>
> Key: FLINK-7667
> URL: https://issues.apache.org/jira/browse/FLINK-7667
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to decouple the REST endpoint from the {{JobMaster}} we should have 
> an "offline" implementation of the {{AccessExecutionGraph}} which is 
> serializable and can be set to remote peers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182830#comment-16182830
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141396295
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
+
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
+
+   private final AtomicReference channelError = new 
AtomicReference<>();
+
+   /**
+* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* while data is still coming in for this channel.
+*/
+   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
+
+   private volatile ChannelHandlerContext ctx;
+
+   // 

+   // Input channel/receiver registration
+   // 

+
+   void addInputChannel(RemoteInputChannel listener) throws IOException {
+   checkError();
+
+   if (!inputChannels.containsKey(listener.getInputChannelId())) {
+   inputChannels.put(listener.getInputChannelId(), 
listener);
+   }
+   }
+
+   void removeInputChannel(RemoteInputChannel listener) {
+   inputChannels.remove(listener.getInputChannelId());
+   }
+
+   void cancelRequestFor(InputChannelID inputChannelId) {
+   if (inputChannelId == null || ctx == null) {
+   return;
+   }
+
+   if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == 
null) {
+   ctx.writeAndFlush(new 
NettyMessage.CancelPartitionRequest(inputChannelId));
+   }
+   }
+
+   // 

+   // Network events
+   // 

+
+   @Override
+   public void channelActive(final ChannelHandlerContext ctx) throws 
Exception {
+   if (this.ctx == 

[GitHub] flink issue #4727: [FLINK-7667] [flip6] Use ArchivedExecutionGraph as serial...

2017-09-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4727
  
Thanks for the review @zentol. Rebasing this commit and waiting on Travis.


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141396295
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
+
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
+
+   private final AtomicReference channelError = new 
AtomicReference<>();
+
+   /**
+* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* while data is still coming in for this channel.
+*/
+   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
+
+   private volatile ChannelHandlerContext ctx;
+
+   // 

+   // Input channel/receiver registration
+   // 

+
+   void addInputChannel(RemoteInputChannel listener) throws IOException {
+   checkError();
+
+   if (!inputChannels.containsKey(listener.getInputChannelId())) {
+   inputChannels.put(listener.getInputChannelId(), 
listener);
+   }
+   }
+
+   void removeInputChannel(RemoteInputChannel listener) {
+   inputChannels.remove(listener.getInputChannelId());
+   }
+
+   void cancelRequestFor(InputChannelID inputChannelId) {
+   if (inputChannelId == null || ctx == null) {
+   return;
+   }
+
+   if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == 
null) {
+   ctx.writeAndFlush(new 
NettyMessage.CancelPartitionRequest(inputChannelId));
+   }
+   }
+
+   // 

+   // Network events
+   // 

+
+   @Override
+   public void channelActive(final ChannelHandlerContext ctx) throws 
Exception {
+   if (this.ctx == null) {
+   this.ctx = ctx;
+   }
+
+   super.channelActive(ctx);
+   }
+
+   @Override
+   public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
+

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182828#comment-16182828
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141396023
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
+
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
+
+   private final AtomicReference channelError = new 
AtomicReference<>();
+
+   /**
+* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* while data is still coming in for this channel.
+*/
+   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
--- End diff --

I saw, but let's sneak this change in ;)
(`CreditBasedClientHandler` is a new file and and although I know that it 
will eventually replace `PartitionRequestClientHandler`, I'd like it to not 
keep too much legacy stuff)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> 

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-09-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r141396023
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
+
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
+
+   private final AtomicReference channelError = new 
AtomicReference<>();
+
+   /**
+* Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
+* while data is still coming in for this channel.
+*/
+   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
--- End diff --

I saw, but let's sneak this change in ;)
(`CreditBasedClientHandler` is a new file and and although I know that it 
will eventually replace `PartitionRequestClientHandler`, I'd like it to not 
keep too much legacy stuff)


---


[jira] [Assigned] (FLINK-7717) Port TaskManagerMetricsHandler to new REST endpoint

2017-09-27 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7717:
---

Assignee: Bowen Li

> Port TaskManagerMetricsHandler to new REST endpoint
> ---
>
> Key: FLINK-7717
> URL: https://issues.apache.org/jira/browse/FLINK-7717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port {{TaskManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7649) Port JobStoppingHandler to new REST endpoint

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182827#comment-16182827
 ] 

ASF GitHub Bot commented on FLINK-7649:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4700
  
Thanks for the review @zentol. Rebasing onto the latest #4697 version.


> Port JobStoppingHandler to new REST endpoint
> 
>
> Key: FLINK-7649
> URL: https://issues.apache.org/jira/browse/FLINK-7649
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobStoppingHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   4   5   >