[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183603#comment-16183603 ] ASF GitHub Bot commented on FLINK-7491: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141517953 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link TypeInformation} for the Multiset types of the Java API. + * + * @param The type of the elements in the Multiset. + */ +@PublicEvolving +public final class MultisetTypeInfo extends MapTypeInfo{ --- End diff -- Great! That makes things a lot easier :-) > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141517953 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link TypeInformation} for the Multiset types of the Java API. + * + * @param The type of the elements in the Multiset. + */ +@PublicEvolving +public final class MultisetTypeInfo extends MapTypeInfo{ --- End diff -- Great! That makes things a lot easier :-) ---
[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message
[ https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183586#comment-16183586 ] ASF GitHub Bot commented on FLINK-7661: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4698 @NicoK , it already passed travis tests this time. > Add credit field in PartitionRequest message > > > Key: FLINK-7661 > URL: https://issues.apache.org/jira/browse/FLINK-7661 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: zhijiang >Assignee: zhijiang > > Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | > {{queueIndex}} | {{InputChannelID}} fields. > We will add a new {{credit}} field indicating the initial credit of > {{InputChannel}}, and this info can be got from {{InputChannel}} directly > after assigning exclusive buffers to it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4698: [FLINK-7661][network] Add credit field in PartitionReques...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4698 @NicoK , it already passed travis tests this time. ---
[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website
[ https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 updated FLINK-7686: -- Labels: (was: comm) > Add Flink Forward Berlin 2017 conference slides to the flink website > > > Key: FLINK-7686 > URL: https://issues.apache.org/jira/browse/FLINK-7686 > Project: Flink > Issue Type: Wish > Components: Project Website >Reporter: Hai Zhou UTC+8 >Priority: Trivial > > I recently watched [Flink Forward Berlin > 2017|https://berlin.flink-forward.org/sessions/] conference slides, the > content is very good. > I think we should add them to the [flink > website|http://flink.apache.org/community.html] for more people to know. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website
[ https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 updated FLINK-7686: -- Labels: comm (was: site) > Add Flink Forward Berlin 2017 conference slides to the flink website > > > Key: FLINK-7686 > URL: https://issues.apache.org/jira/browse/FLINK-7686 > Project: Flink > Issue Type: Wish > Components: Project Website >Reporter: Hai Zhou UTC+8 >Priority: Trivial > > I recently watched [Flink Forward Berlin > 2017|https://berlin.flink-forward.org/sessions/] conference slides, the > content is very good. > I think we should add them to the [flink > website|http://flink.apache.org/community.html] for more people to know. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website
[ https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 updated FLINK-7686: -- Component/s: Project Website > Add Flink Forward Berlin 2017 conference slides to the flink website > > > Key: FLINK-7686 > URL: https://issues.apache.org/jira/browse/FLINK-7686 > Project: Flink > Issue Type: Wish > Components: Project Website >Reporter: Hai Zhou UTC+8 >Priority: Trivial > Labels: site > > I recently watched [Flink Forward Berlin > 2017|https://berlin.flink-forward.org/sessions/] conference slides, the > content is very good. > I think we should add them to the [flink > website|http://flink.apache.org/community.html] for more people to know. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website
[ https://issues.apache.org/jira/browse/FLINK-7686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 updated FLINK-7686: -- Labels: site (was: ) > Add Flink Forward Berlin 2017 conference slides to the flink website > > > Key: FLINK-7686 > URL: https://issues.apache.org/jira/browse/FLINK-7686 > Project: Flink > Issue Type: Wish > Components: Project Website >Reporter: Hai Zhou UTC+8 >Priority: Trivial > Labels: site > > I recently watched [Flink Forward Berlin > 2017|https://berlin.flink-forward.org/sessions/] conference slides, the > content is very good. > I think we should add them to the [flink > website|http://flink.apache.org/community.html] for more people to know. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183532#comment-16183532 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141507401 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { + def this() { +this(null) + } + + def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]] + + override def equals(that: Any): Boolean = +that match { + case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0 + case _ => false +} +} + +abstract class CollectAggFunction[E] + extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] { + + override def createAccumulator(): CollectAccumulator[E] = { +val acc = new CollectAccumulator[E](new MapView[E, Integer]( + getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO)) +acc + } + + def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = { +if (value != null) { + if (accumulator.f0.contains(value)) { +accumulator.f0.put(value, accumulator.f0.get(value) + 1) + } else { +accumulator.f0.put(value, 1) + } +} + } + + override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = { +val iterator = accumulator.f0.iterator +if (iterator.hasNext) { + val map = new util.HashMap[E, Integer]() + while (iterator.hasNext) { +val entry = iterator.next() +map.put(entry.getKey, entry.getValue) + } + map +} else { + null.asInstanceOf[util.Map[E, Integer]] --- End diff -- Check with Calcite tests, should return an empty Multiset instead. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141507401 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { + def this() { +this(null) + } + + def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]] + + override def equals(that: Any): Boolean = +that match { + case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0 + case _ => false +} +} + +abstract class CollectAggFunction[E] + extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] { + + override def createAccumulator(): CollectAccumulator[E] = { +val acc = new CollectAccumulator[E](new MapView[E, Integer]( + getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO)) +acc + } + + def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = { +if (value != null) { + if (accumulator.f0.contains(value)) { +accumulator.f0.put(value, accumulator.f0.get(value) + 1) + } else { +accumulator.f0.put(value, 1) + } +} + } + + override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = { +val iterator = accumulator.f0.iterator +if (iterator.hasNext) { + val map = new util.HashMap[E, Integer]() + while (iterator.hasNext) { +val entry = iterator.next() +map.put(entry.getKey, entry.getValue) + } + map +} else { + null.asInstanceOf[util.Map[E, Integer]] --- End diff -- Check with Calcite tests, should return an empty Multiset instead. ---
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183530#comment-16183530 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141507177 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link TypeInformation} for the Multiset types of the Java API. + * + * @param The type of the elements in the Multiset. + */ +@PublicEvolving +public final class MultisetTypeInfo extends MapTypeInfo{ --- End diff -- I took a look at Calcite tests for Collect function, null will be ignored. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183531#comment-16183531 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141507197 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link TypeInformation} for the Multiset types of the Java API. + * + * @param The type of the elements in the Multiset. + */ +@PublicEvolving +public final class MultisetTypeInfo extends MapTypeInfo{ + + private static final long serialVersionUID = 1L; + + + public MultisetTypeInfo(Class elementTypeClass) { + super(elementTypeClass, Integer.class); + } + + public MultisetTypeInfo(TypeInformation elementTypeInfo) { + super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO); + } + + // + // MultisetTypeInfo specific properties + // + + /** +* Gets the type information for the elements contained in the Multiset +*/ + public TypeInformation getElementTypeInfo() { + return getKeyTypeInfo(); + } + + // + // TypeInformation implementation + // + + @Override + public boolean isBasicType() { --- End diff -- done > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141507197 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link TypeInformation} for the Multiset types of the Java API. + * + * @param The type of the elements in the Multiset. + */ +@PublicEvolving +public final class MultisetTypeInfo extends MapTypeInfo{ + + private static final long serialVersionUID = 1L; + + + public MultisetTypeInfo(Class elementTypeClass) { + super(elementTypeClass, Integer.class); + } + + public MultisetTypeInfo(TypeInformation elementTypeInfo) { + super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO); + } + + // + // MultisetTypeInfo specific properties + // + + /** +* Gets the type information for the elements contained in the Multiset +*/ + public TypeInformation getElementTypeInfo() { + return getKeyTypeInfo(); + } + + // + // TypeInformation implementation + // + + @Override + public boolean isBasicType() { --- End diff -- done ---
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141507177 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link TypeInformation} for the Multiset types of the Java API. + * + * @param The type of the elements in the Multiset. + */ +@PublicEvolving +public final class MultisetTypeInfo extends MapTypeInfo{ --- End diff -- I took a look at Calcite tests for Collect function, null will be ignored. ---
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183465#comment-16183465 ] Kent Murra commented on FLINK-7657: --- As an aside, my testing indicates that the "DATE/TIME/TIMESTAMP" functions in the SQL language assume UTC time zone. is that correct? If so then it might be a good idea to document that in correct places in the API and the online documentation (I can attempt to do so if I can find the appropriate places to edit, which I haven't so-far). In particular, https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html indicates field translations to a set of raw java types that are time-zone sensitive (all subclasses of java.util.Date) while not indicating the assumed time zone. For my use case (trying to write something that pushes down timestamp predicates into a query against a MySQL database), this can cause some significant confusion and require experimentation to work out. Alternatively, we can mimic what Calcite is doing and have them be specially-typed strings (Calcite uses TimeString, TimestampString, DateString). Since Java 7 is still supported we unfortunately can't use the new time API. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) >
[jira] [Closed] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-7635. --- Resolution: Done > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li > Fix For: 1.4.0 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183444#comment-16183444 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141494139 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { + def this() { +this(null) + } + + def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]] + + override def equals(that: Any): Boolean = --- End diff -- normal scala class still need to, but case class dont need to. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183445#comment-16183445 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141494189 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { --- End diff -- done > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141494189 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { --- End diff -- done ---
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141494139 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { + def this() { +this(null) + } + + def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]] + + override def equals(that: Any): Boolean = --- End diff -- normal scala class still need to, but case class dont need to. ---
[jira] [Commented] (FLINK-7724) add extra metrics to MetricStoreTest.setupStore
[ https://issues.apache.org/jira/browse/FLINK-7724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183428#comment-16183428 ] ASF GitHub Bot commented on FLINK-7724: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4739 cc @tillrohrmann @zentol > add extra metrics to MetricStoreTest.setupStore > --- > > Key: FLINK-7724 > URL: https://issues.apache.org/jira/browse/FLINK-7724 > Project: Flink > Issue Type: Test > Components: REST, Tests >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > > Currently, {{MetricStoreTest.setupStore()}} only mock jobs with only one > metrics. We need to have at least one job with multiple metrics to make the > mocked MetricStore more generic. > This is a pre-requisite for FLINK-7694 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6703) Document how to take a savepoint on YARN
[ https://issues.apache.org/jira/browse/FLINK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183427#comment-16183427 ] ASF GitHub Bot commented on FLINK-6703: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4721 cc @zentol > Document how to take a savepoint on YARN > > > Key: FLINK-6703 > URL: https://issues.apache.org/jira/browse/FLINK-6703 > Project: Flink > Issue Type: Improvement > Components: Documentation, YARN >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Bowen Li > > The documentation should have a separate entry for savepoint related CLI > commands in combination with YARN. It is currently not documented that you > have to supply the application id, nor how you can pass it. > {code} > ./bin/flink savepoint -m yarn-cluster (-yid|-yarnapplicationId) > > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4739: [FLINK-7724][REST][Tests][Metrics] add extra metrics to M...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4739 cc @tillrohrmann @zentol ---
[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183426#comment-16183426 ] ASF GitHub Bot commented on FLINK-7650: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141492673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. + * + * A job related REST handler always requires a {@link JobIDPathParameter}. + */ +public class JobMessageParameters extends MessageParameters { --- End diff -- ok, got you > Port JobCancellationHandler to new REST endpoint > > > Key: FLINK-7650 > URL: https://issues.apache.org/jira/browse/FLINK-7650 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobCancellationHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4721: [FLINK-6703][savepoint/doc] Document how to take a savepo...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4721 cc @zentol ---
[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141492673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. + * + * A job related REST handler always requires a {@link JobIDPathParameter}. + */ +public class JobMessageParameters extends MessageParameters { --- End diff -- ok, got you ---
[jira] [Commented] (FLINK-7724) add extra metrics to MetricStoreTest.setupStore
[ https://issues.apache.org/jira/browse/FLINK-7724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183414#comment-16183414 ] ASF GitHub Bot commented on FLINK-7724: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4739 [FLINK-7724][REST][Tests][Metrics] add extra metrics to MetricStoreTest.setupStore ## What is the purpose of the change This is a pre-requisite for FLINK-7694 I'm porting `JobMetricsHandler` to new REST endpoint. Part of the migration is to make sure the json generated before and after stay the same for UI to consume, and `JobMetricsHandler` is using `MetricStoreTest.setupStore()` to get a mocked `MetricStore`. Here comes the problem. Currently, `MetricStoreTest.setupStore()` mocks all jobs with only one metric. We need to have at least one or two jobs with multiple metrics to make the mocked `MetricStore` can cover most use cases, and ensure the Json blob generated in `JobMetricsHandler` stay the same before and after migration. ## Brief change log - make two jobs in `MetricStoreTest.setupStore()` contain multiple metrics ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7724 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4739.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4739 commit bc4aa8c3d9b057a6d35b620eb8e00048137e9361 Author: Bowen LiDate: 2017-09-27T22:30:02Z [FLINK-7724] add extra metrics to MetricStoreTest.setupStore commit d223444eba17c620e8c45c372a7b2e6fafdfd169 Author: Bowen Li Date: 2017-09-27T22:39:17Z delete unrelated files > add extra metrics to MetricStoreTest.setupStore > --- > > Key: FLINK-7724 > URL: https://issues.apache.org/jira/browse/FLINK-7724 > Project: Flink > Issue Type: Test > Components: REST, Tests >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > > Currently, {{MetricStoreTest.setupStore()}} only mock jobs with only one > metrics. We need to have at least one job with multiple metrics to make the > mocked MetricStore more generic. > This is a pre-requisite for FLINK-7694 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7649) Port JobStoppingHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183415#comment-16183415 ] ASF GitHub Bot commented on FLINK-7649: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4700 > Port JobStoppingHandler to new REST endpoint > > > Key: FLINK-7649 > URL: https://issues.apache.org/jira/browse/FLINK-7649 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobStoppingHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4700: [FLINK-7649] [flip6] Extend JobTerminationHandler ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4700 ---
[GitHub] flink pull request #4739: [FLINK-7724][REST][Tests][Metrics] add extra metri...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4739 [FLINK-7724][REST][Tests][Metrics] add extra metrics to MetricStoreTest.setupStore ## What is the purpose of the change This is a pre-requisite for FLINK-7694 I'm porting `JobMetricsHandler` to new REST endpoint. Part of the migration is to make sure the json generated before and after stay the same for UI to consume, and `JobMetricsHandler` is using `MetricStoreTest.setupStore()` to get a mocked `MetricStore`. Here comes the problem. Currently, `MetricStoreTest.setupStore()` mocks all jobs with only one metric. We need to have at least one or two jobs with multiple metrics to make the mocked `MetricStore` can cover most use cases, and ensure the Json blob generated in `JobMetricsHandler` stay the same before and after migration. ## Brief change log - make two jobs in `MetricStoreTest.setupStore()` contain multiple metrics ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7724 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4739.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4739 commit bc4aa8c3d9b057a6d35b620eb8e00048137e9361 Author: Bowen LiDate: 2017-09-27T22:30:02Z [FLINK-7724] add extra metrics to MetricStoreTest.setupStore commit d223444eba17c620e8c45c372a7b2e6fafdfd169 Author: Bowen Li Date: 2017-09-27T22:39:17Z delete unrelated files ---
[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183413#comment-16183413 ] ASF GitHub Bot commented on FLINK-7650: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4697 > Port JobCancellationHandler to new REST endpoint > > > Key: FLINK-7650 > URL: https://issues.apache.org/jira/browse/FLINK-7650 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobCancellationHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7649) Port JobStoppingHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7649. -- Resolution: Fixed Fixed via adeab64ea0abdc892e51c6f630aa56dabf7e2c98 > Port JobStoppingHandler to new REST endpoint > > > Key: FLINK-7649 > URL: https://issues.apache.org/jira/browse/FLINK-7649 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobStoppingHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4697 ---
[jira] [Resolved] (FLINK-7650) Port JobCancellationHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7650. -- Resolution: Fixed Fixed via 8ea4db1a8b368b4e00dd310c0d07405fd2142b34 > Port JobCancellationHandler to new REST endpoint > > > Key: FLINK-7650 > URL: https://issues.apache.org/jira/browse/FLINK-7650 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobCancellationHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7723) DispatcherTest instable
[ https://issues.apache.org/jira/browse/FLINK-7723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7723. Resolution: Fixed Fixed via 4debc6033cc2099b763d7fef86829b9f6734d91a > DispatcherTest instable > --- > > Key: FLINK-7723 > URL: https://issues.apache.org/jira/browse/FLINK-7723 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{DispatcherTest}} is instable on Travis. > https://travis-ci.org/tillrohrmann/flink/jobs/280508821 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183381#comment-16183381 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141485981 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1414,8 +1414,29 @@ object AggregateUtil { aggregates(index) = udagg.getFunction accTypes(index) = udagg.accType -case unSupported: SqlAggFunction => - throw new TableException(s"unsupported Function: '${unSupported.getName}'") +case other: SqlAggFunction => + if (other.getKind == SqlKind.COLLECT) { +aggregates(index) = sqlTypeName match { + case TINYINT => +new ByteCollectAggFunction + case SMALLINT => +new ShortCollectAggFunction + case INTEGER => +new IntCollectAggFunction + case BIGINT => +new LongCollectAggFunction + case VARCHAR | CHAR => +new StringCollectAggFunction + case FLOAT => +new FloatCollectAggFunction + case DOUBLE => +new DoubleCollectAggFunction + case _ => +new ObjectCollectAggFunction +} + } else { --- End diff -- done > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183380#comment-16183380 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141485942 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1414,8 +1414,29 @@ object AggregateUtil { aggregates(index) = udagg.getFunction accTypes(index) = udagg.accType -case unSupported: SqlAggFunction => - throw new TableException(s"unsupported Function: '${unSupported.getName}'") +case other: SqlAggFunction => --- End diff -- done > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141485942 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1414,8 +1414,29 @@ object AggregateUtil { aggregates(index) = udagg.getFunction accTypes(index) = udagg.accType -case unSupported: SqlAggFunction => - throw new TableException(s"unsupported Function: '${unSupported.getName}'") +case other: SqlAggFunction => --- End diff -- done ---
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141485981 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1414,8 +1414,29 @@ object AggregateUtil { aggregates(index) = udagg.getFunction accTypes(index) = udagg.accType -case unSupported: SqlAggFunction => - throw new TableException(s"unsupported Function: '${unSupported.getName}'") +case other: SqlAggFunction => + if (other.getKind == SqlKind.COLLECT) { +aggregates(index) = sqlTypeName match { + case TINYINT => +new ByteCollectAggFunction + case SMALLINT => +new ShortCollectAggFunction + case INTEGER => +new IntCollectAggFunction + case BIGINT => +new LongCollectAggFunction + case VARCHAR | CHAR => +new StringCollectAggFunction + case FLOAT => +new FloatCollectAggFunction + case DOUBLE => +new DoubleCollectAggFunction + case _ => +new ObjectCollectAggFunction +} + } else { --- End diff -- done ---
[jira] [Created] (FLINK-7724) add extra metrics to MetricStoreTest.setupStore
Bowen Li created FLINK-7724: --- Summary: add extra metrics to MetricStoreTest.setupStore Key: FLINK-7724 URL: https://issues.apache.org/jira/browse/FLINK-7724 Project: Flink Issue Type: Test Components: REST, Tests Affects Versions: 1.4.0 Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.4.0 Currently, {{MetricStoreTest.setupStore()}} only mock jobs with only one metrics. We need to have at least one job with multiple metrics to make the mocked MetricStore more generic. This is a pre-requisite for FLINK-7694 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7722) MiniCluster does not appear to honor Log4j settings
[ https://issues.apache.org/jira/browse/FLINK-7722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy closed FLINK-7722. - Resolution: Not A Problem > MiniCluster does not appear to honor Log4j settings > --- > > Key: FLINK-7722 > URL: https://issues.apache.org/jira/browse/FLINK-7722 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > When executing a job from the command line for testing, it will output logs > like: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] > with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014. > 09/27/2017 13:15:13 Job execution switched to status RUNNING. > 09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED > 09/27/2017 13:15:13 Source: Collect > {noformat} > It will do so even if the log4j.properties file contains: > {code} > log4j.rootLogger=ERROR, stdout > log4j.logger.org.apache.flink=ERROR > log4j.logger.akka=ERROR > log4j.logger.org.apache.kafka=ERROR > log4j.logger.org.apache.hadoop=ERROR > log4j.logger.org.apache.zookeeper=ERROR > {code} > It seems that the MiniCluster does not honor Log4j settings, or at least that > is my guess. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7722) MiniCluster does not appear to honor Log4j settings
[ https://issues.apache.org/jira/browse/FLINK-7722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183336#comment-16183336 ] Elias Levy commented on FLINK-7722: --- Aye. I figured out the issue was that I needed to call {{env.getConfig().disableSysoutLogging()}} by tracing the code. Closing. > MiniCluster does not appear to honor Log4j settings > --- > > Key: FLINK-7722 > URL: https://issues.apache.org/jira/browse/FLINK-7722 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > When executing a job from the command line for testing, it will output logs > like: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] > with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014. > 09/27/2017 13:15:13 Job execution switched to status RUNNING. > 09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED > 09/27/2017 13:15:13 Source: Collect > {noformat} > It will do so even if the log4j.properties file contains: > {code} > log4j.rootLogger=ERROR, stdout > log4j.logger.org.apache.flink=ERROR > log4j.logger.akka=ERROR > log4j.logger.org.apache.kafka=ERROR > log4j.logger.org.apache.hadoop=ERROR > log4j.logger.org.apache.zookeeper=ERROR > {code} > It seems that the MiniCluster does not honor Log4j settings, or at least that > is my guess. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7723) DispatcherTest instable
Till Rohrmann created FLINK-7723: Summary: DispatcherTest instable Key: FLINK-7723 URL: https://issues.apache.org/jira/browse/FLINK-7723 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Critical The {{DispatcherTest}} is instable on Travis. https://travis-ci.org/tillrohrmann/flink/jobs/280508821 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183304#comment-16183304 ] Mikhail Pryakhin commented on FLINK-6949: - I propose to add a --yarnship-files CLI option that will submit folders with files to YARN cluster and add these folders to the application classpath. At the moment the --yarnship option [traverses|https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:] the content of the specified yarnship folder and appends files to the classpath. In case a ship folder contains non *.class, *.jar or *.zip files, but for example config/property files this takes no effect if such files are appended to the application classpath. So it would be great to have an ability to add the whole folders to the application classpath. I'm going to add a PR to support this feature. > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Critical > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6949) Add ability to ship custom resource files to YARN cluster
[ https://issues.apache.org/jira/browse/FLINK-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin reassigned FLINK-6949: --- Assignee: Mikhail Pryakhin > Add ability to ship custom resource files to YARN cluster > - > > Key: FLINK-6949 > URL: https://issues.apache.org/jira/browse/FLINK-6949 > Project: Flink > Issue Type: Improvement > Components: Client, YARN >Affects Versions: 1.3.0 >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Critical > > *The problem:* > When deploying a flink job on YARN it is not possible to specify custom > resource files to be shipped to YARN cluster. > > *The use case description:* > When running a flink job on multiple environments it becomes necessary to > pass environment-related configuration files to the job's runtime. It can be > accomplished by packaging configuration files within the job's jar. But > having tens of different environments one can easily end up packaging as many > jars as there are environments. It would be great to have an ability to > separate configuration files from the job artifacts. > > *The possible solution:* > add the --yarnship-files option to flink cli to specify files that should be > shipped to the YARN cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183241#comment-16183241 ] Kent Murra commented on FLINK-7657: --- Thanks for the information Timo. My approach was to add an apply method that takes in the RexLiteral to the object as follows: {code} object Literal { ... private[flink] def apply(rexNode: RexLiteral): Literal = { val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType) val literalValue = literalType match { case _...@sqltimetypeinfo.date => val rexValue = rexNode.getValueAs(classOf[DateString]) new Date(rexValue.getMillisSinceEpoch) case _...@sqltimetypeinfo.time => val rexValue = rexNode.getValueAs(classOf[TimeString]) new Time(rexValue.getMillisOfDay) case _@SqlTimeTypeInfo.TIMESTAMP => // We're losing nanosecond precision but according to the documentation we're only // supporting TIMESTAMP(3) at the moment. In order to support nanosecond precision, we'd want to // convert to string and then to java.sql.Timestamp val rexValue = rexNode.getValueAs(classOf[TimestampString]) new Timestamp(rexValue.getMillisSinceEpoch) case _ =>rexNode.getValue } Literal(literalValue, literalType) } } {code} My view: We want to keep the transformation logic to and from the RexNode in a similar file so its more obvious that changes need to be "paired". This doesn't match your suggestion, but I think that you were warning me away from touching the case class Literal since that appears to be fairly widely used, and impact is hard to gauge. Sorry for the delay in getting a PR out since I got randomized. I'm looking at this again, and I'll get something out and let you guys tell me what you do or don't like about it. For now I'll focus on the Date/Time/Timestamp aspect and can follow up with the other types as necessary. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException:
[jira] [Commented] (FLINK-7722) MiniCluster does not appear to honor Log4j settings
[ https://issues.apache.org/jira/browse/FLINK-7722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183221#comment-16183221 ] Ufuk Celebi commented on FLINK-7722: I think this is the output of the client that is actually submitting the job to the {{FlinkMiniCluster}}. I also find this confusing/annoying at times. You can disable it via {{env.getConfig().disableSysoutLogging()}}. > MiniCluster does not appear to honor Log4j settings > --- > > Key: FLINK-7722 > URL: https://issues.apache.org/jira/browse/FLINK-7722 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Elias Levy >Priority: Minor > > When executing a job from the command line for testing, it will output logs > like: > {noformat} > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] > with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014. > 09/27/2017 13:15:13 Job execution switched to status RUNNING. > 09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED > 09/27/2017 13:15:13 Source: Collect > {noformat} > It will do so even if the log4j.properties file contains: > {code} > log4j.rootLogger=ERROR, stdout > log4j.logger.org.apache.flink=ERROR > log4j.logger.akka=ERROR > log4j.logger.org.apache.kafka=ERROR > log4j.logger.org.apache.hadoop=ERROR > log4j.logger.org.apache.zookeeper=ERROR > {code} > It seems that the MiniCluster does not honor Log4j settings, or at least that > is my guess. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7722) MiniCluster does not appear to honor Log4j settings
Elias Levy created FLINK-7722: - Summary: MiniCluster does not appear to honor Log4j settings Key: FLINK-7722 URL: https://issues.apache.org/jira/browse/FLINK-7722 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.3.2 Reporter: Elias Levy Priority: Minor When executing a job from the command line for testing, it will output logs like: {noformat} Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014. 09/27/2017 13:15:13 Job execution switched to status RUNNING. 09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED 09/27/2017 13:15:13 Source: Collect {noformat} It will do so even if the log4j.properties file contains: {code} log4j.rootLogger=ERROR, stdout log4j.logger.org.apache.flink=ERROR log4j.logger.akka=ERROR log4j.logger.org.apache.kafka=ERROR log4j.logger.org.apache.hadoop=ERROR log4j.logger.org.apache.zookeeper=ERROR {code} It seems that the MiniCluster does not honor Log4j settings, or at least that is my guess. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7720) Centralize creation of backends and state related resources
[ https://issues.apache.org/jira/browse/FLINK-7720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183151#comment-16183151 ] Bowen Li commented on FLINK-7720: - +1 > Centralize creation of backends and state related resources > --- > > Key: FLINK-7720 > URL: https://issues.apache.org/jira/browse/FLINK-7720 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > State related resources like keyed state backends, operator state backends, > timer service manager, etc. are currently all created in different places, > somewhere between {{StreamTask}} and {{AbstractStreamOperator}}. It is hard > to identify where/how resources are created, which component owns them, and > how certain components depend on others (e.g. there is only a timer service > manager if there is also a keyed state backend). > For the changes we plan for local state recovery, it would make sense to > define a component that is responsible for the creation of state related > resources and gives the possibility to interact in checkpointing and > restoring with the checkpoint coordinator (owned by job manager) as well as > local state storage (owned by task manager). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183148#comment-16183148 ] ASF GitHub Bot commented on FLINK-7491: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141449807 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { + def this() { +this(null) + } + + def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]] + + override def equals(that: Any): Boolean = +that match { + case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0 + case _ => false +} +} + +abstract class CollectAggFunction[E] + extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] { + + override def createAccumulator(): CollectAccumulator[E] = { +val acc = new CollectAccumulator[E](new MapView[E, Integer]( + getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO)) +acc + } + + def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = { +if (value != null) { + if (accumulator.f0.contains(value)) { --- End diff -- Good catch. done. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r141449807 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.dataview.MapView +import org.apache.flink.table.dataview.MapViewTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + +/** The initial accumulator for Collect aggregate function */ +class CollectAccumulator[E](var f0:MapView[E, Integer]) { + def this() { +this(null) + } + + def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]] + + override def equals(that: Any): Boolean = +that match { + case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0 + case _ => false +} +} + +abstract class CollectAggFunction[E] + extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] { + + override def createAccumulator(): CollectAccumulator[E] = { +val acc = new CollectAccumulator[E](new MapView[E, Integer]( + getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO)) +acc + } + + def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = { +if (value != null) { + if (accumulator.f0.contains(value)) { --- End diff -- Good catch. done. ---
[jira] [Assigned] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN
[ https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikhail Pryakhin reassigned FLINK-6950: --- Assignee: Mikhail Pryakhin > Add ability to specify single jar files to be shipped to YARN > - > > Key: FLINK-6950 > URL: https://issues.apache.org/jira/browse/FLINK-6950 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: Mikhail Pryakhin >Assignee: Mikhail Pryakhin >Priority: Minor > > When deploying a flink job on YARN it is not possible to specify multiple > yarnship folders. > Often when submitting a flink job, job dependencies and job resources are > located in different local folders and both of them should be shipped to YARN > cluster. > I think it would be great to have an ability to specify jars but not folders > that should be shipped to YARN cluster (via the --yarnship-jars option). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6950) Add ability to specify single jar files to be shipped to YARN
[ https://issues.apache.org/jira/browse/FLINK-6950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183104#comment-16183104 ] Robert Metzger commented on FLINK-6950: --- Yes, please go ahead! > Add ability to specify single jar files to be shipped to YARN > - > > Key: FLINK-6950 > URL: https://issues.apache.org/jira/browse/FLINK-6950 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: Mikhail Pryakhin >Priority: Minor > > When deploying a flink job on YARN it is not possible to specify multiple > yarnship folders. > Often when submitting a flink job, job dependencies and job resources are > located in different local folders and both of them should be shipped to YARN > cluster. > I think it would be great to have an ability to specify jars but not folders > that should be shipped to YARN cluster (via the --yarnship-jars option). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183016#comment-16183016 ] ASF GitHub Bot commented on FLINK-7650: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4697 +1 from my side. > Port JobCancellationHandler to new REST endpoint > > > Key: FLINK-7650 > URL: https://issues.apache.org/jira/browse/FLINK-7650 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobCancellationHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels
[ https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183014#comment-16183014 ] ASF GitHub Bot commented on FLINK-7721: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4738 [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs ## What is the purpose of the change Prior to this PR, in the calculation of the new min watermark in `StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()`, there is no verification that the calculated new min watermark really is aggregated from some aligned channel. In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation is `Long.MAX_VALUE` and emit that. This PR fixes this by only emitting the aggregated watermark iff it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a `Long.MAX_VALUE` was genuinely aggregated from the input channels. **This is a critical bug that should be fixed for all affecting versions: 1.2.1, 1.3.2, 1.4.0 (master)** ## Brief change log - Fix `findAndOutputNewMinWatermarkAcrossAlignedChannels` to only output the new min watermark iff it was truly computed from some aligned channel. - Adapt `StatusWatermarkValveTest#testMultipleInputValve()` to cover the missing case. ## Verifying this change Verified by the modified `StatusWatermarkValveTest#testMultipleInputValve()`. Without the fix in `findAndOutputNewMinWatermarkAcrossAlignedChannels`, the modified test does not pass. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): **YES** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7721 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4738.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4738 commit 29d3b74f81410ea9c2ed861c297ece8134d9b29f Author: Tzu-Li (Gordon) TaiDate: 2017-09-27T18:05:21Z [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs Prior to this commit, In the calculation of the new min watermark in StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(), there is no verification that the calculated new min watermark really is aggregated from some aligned channel. In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation is Long.MAX_VALUE and emit that. This commit fixes this by only emitting the aggregated watermark iff it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a Long.MAX_VALUE was genuinely aggregated from the input channels. > StatusWatermarkValve should output a new min watermark only if it was > aggregated from aligned chhanels > -- > > Key: FLINK-7721 > URL: https://issues.apache.org/jira/browse/FLINK-7721 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.2, 1.4.0, 1.3.3 > > > Context: > {code} > long newMinWatermark = Long.MAX_VALUE; > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > {code} > In the calculation of the new min
[GitHub] flink issue #4697: [FLINK-7650] [flip6] Port JobCancellationHandler to new R...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4697 +1 from my side. ---
[GitHub] flink pull request #4738: [FLINK-7721] [DataStream] Only emit new min waterm...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4738 [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs ## What is the purpose of the change Prior to this PR, in the calculation of the new min watermark in `StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()`, there is no verification that the calculated new min watermark really is aggregated from some aligned channel. In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation is `Long.MAX_VALUE` and emit that. This PR fixes this by only emitting the aggregated watermark iff it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a `Long.MAX_VALUE` was genuinely aggregated from the input channels. **This is a critical bug that should be fixed for all affecting versions: 1.2.1, 1.3.2, 1.4.0 (master)** ## Brief change log - Fix `findAndOutputNewMinWatermarkAcrossAlignedChannels` to only output the new min watermark iff it was truly computed from some aligned channel. - Adapt `StatusWatermarkValveTest#testMultipleInputValve()` to cover the missing case. ## Verifying this change Verified by the modified `StatusWatermarkValveTest#testMultipleInputValve()`. Without the fix in `findAndOutputNewMinWatermarkAcrossAlignedChannels`, the modified test does not pass. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): **YES** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7721 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4738.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4738 commit 29d3b74f81410ea9c2ed861c297ece8134d9b29f Author: Tzu-Li (Gordon) TaiDate: 2017-09-27T18:05:21Z [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated from watermark-aligned inputs Prior to this commit, In the calculation of the new min watermark in StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(), there is no verification that the calculated new min watermark really is aggregated from some aligned channel. In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation is Long.MAX_VALUE and emit that. This commit fixes this by only emitting the aggregated watermark iff it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a Long.MAX_VALUE was genuinely aggregated from the input channels. ---
[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183011#comment-16183011 ] ASF GitHub Bot commented on FLINK-7650: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141425581 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. + * + * A job related REST handler always requires a {@link JobIDPathParameter}. + */ +public class JobMessageParameters extends MessageParameters { --- End diff -- This class is meant to be a general parameters class for whatever parameters are generally required for job-related queries. While it currently only contains the job ID i think it makes sense to leave it as is. > Port JobCancellationHandler to new REST endpoint > > > Key: FLINK-7650 > URL: https://issues.apache.org/jira/browse/FLINK-7650 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobCancellationHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141425581 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. + * + * A job related REST handler always requires a {@link JobIDPathParameter}. + */ +public class JobMessageParameters extends MessageParameters { --- End diff -- This class is meant to be a general parameters class for whatever parameters are generally required for job-related queries. While it currently only contains the job ID i think it makes sense to leave it as is. ---
[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files
[ https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183002#comment-16183002 ] ASF GitHub Bot commented on FLINK-5944: --- Github user mlipkovich commented on a diff in the pull request: https://github.com/apache/flink/pull/4683#discussion_r141424281 --- Diff: flink-core/pom.xml --- @@ -52,6 +52,12 @@ under the License. flink-shaded-asm + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + --- End diff -- Thanks for your comment Aljoscha, So there are at least three ways on how to achieve it: either mark this dependency as 'provided', move Hadoop Snappy Codec related classes to flink-java module or move it to some separate module as suggested @haohui, but I'm not sure what should be inside this module > Flink should support reading Snappy Files > - > > Key: FLINK-5944 > URL: https://issues.apache.org/jira/browse/FLINK-5944 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats >Reporter: Ilya Ganelin >Assignee: Mikhail Lipkovich > Labels: features > > Snappy is an extremely performant compression format that's widely used > offering fast decompression/compression. > This can be easily implemented by creating a SnappyInflaterInputStreamFactory > and updating the initDefaultInflateInputStreamFactories in FileInputFormat. > Flink already includes the Snappy dependency in the project. > There is a minor gotcha in this. If we wish to use this with Hadoop, then we > must provide two separate implementations since Hadoop uses a different > version of the snappy format than Snappy Java (which is the xerial/snappy > included in Flink). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4683: [FLINK-5944] Support reading of Snappy files
Github user mlipkovich commented on a diff in the pull request: https://github.com/apache/flink/pull/4683#discussion_r141424281 --- Diff: flink-core/pom.xml --- @@ -52,6 +52,12 @@ under the License. flink-shaded-asm + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + --- End diff -- Thanks for your comment Aljoscha, So there are at least three ways on how to achieve it: either mark this dependency as 'provided', move Hadoop Snappy Codec related classes to flink-java module or move it to some separate module as suggested @haohui, but I'm not sure what should be inside this module ---
[jira] [Created] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels
Tzu-Li (Gordon) Tai created FLINK-7721: -- Summary: StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels Key: FLINK-7721 URL: https://issues.apache.org/jira/browse/FLINK-7721 Project: Flink Issue Type: Bug Affects Versions: 1.3.2, 1.2.1, 1.4.0 Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Priority: Blocker Fix For: 1.2.2, 1.4.0, 1.3.3 Context: {code} long newMinWatermark = Long.MAX_VALUE; for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } {code} In the calculation of the new min watermark in {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, there is not verification that the calculated new min watermark {{newMinWatermark}} really is aggregated from some aligned channel. In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that. The fix would simply be to only emit the aggregated watermark IFF it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4722: [FLINK-7683] Iterate over keys in KeyedStateBackend
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4722 LGTM! One more ask: can you please add comments to both the class and the code explaining that concurrent modification will fail the iteration? ---
[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182944#comment-16182944 ] ASF GitHub Bot commented on FLINK-7683: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4722 LGTM! One more ask: can you please add comments to both the class and the code explaining that concurrent modification will fail the iteration? > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182930#comment-16182930 ] ASF GitHub Bot commented on FLINK-7650: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141409233 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. + * + * A job related REST handler always requires a {@link JobIDPathParameter}. + */ +public class JobMessageParameters extends MessageParameters { --- End diff -- sorry i missed this file name yesterday. How about `JobIDMessageParameters`? since there're no job query params in this class > Port JobCancellationHandler to new REST endpoint > > > Key: FLINK-7650 > URL: https://issues.apache.org/jira/browse/FLINK-7650 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobCancellationHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182929#comment-16182929 ] ASF GitHub Bot commented on FLINK-7650: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141409228 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. --- End diff -- "Parameters for job `ID` related REST handlers"? > Port JobCancellationHandler to new REST endpoint > > > Key: FLINK-7650 > URL: https://issues.apache.org/jira/browse/FLINK-7650 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobCancellationHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141409228 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. --- End diff -- "Parameters for job `ID` related REST handlers"? ---
[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4697#discussion_r141409233 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for job related REST handlers. + * + * A job related REST handler always requires a {@link JobIDPathParameter}. + */ +public class JobMessageParameters extends MessageParameters { --- End diff -- sorry i missed this file name yesterday. How about `JobIDMessageParameters`? since there're no job query params in this class ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182902#comment-16182902 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r141406439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup. + * + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB, + * it will try to download it from a distributed HA file system (if available) or the BLOB server. + * + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup. + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded. + */ +public class PermanentBlobCache extends TimerTask implements PermanentBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** Root directory for local file storage */ + private final File storageDir; + + /** Blob store for distributed file storage, e.g. in HA */ + private final BlobView blobView; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + // + + /** +* Job reference counters with a time-to-live (TTL). +*/ + @VisibleForTesting + static class RefCount { + /** +* Number of references to a job. +*/ + public int references = 0; + + /** +* Timestamp in milliseconds when any job data should be cleaned up (no cleanup for +* non-positive values). +*/ + public long keepUntil =
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r141406439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup. + * + * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache will first attempt to + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB, + * it will try to download it from a distributed HA file system (if available) or the BLOB server. + * + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup. + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded. + */ +public class PermanentBlobCache extends TimerTask implements PermanentBlobService { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(PermanentBlobCache.class); + + /** Counter to generate unique names for temporary files. */ + private final AtomicLong tempFileCounter = new AtomicLong(0); + + private final InetSocketAddress serverAddress; + + /** Root directory for local file storage */ + private final File storageDir; + + /** Blob store for distributed file storage, e.g. in HA */ + private final BlobView blobView; + + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the storage directory. */ + private final Thread shutdownHook; + + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + + /** Configuration for the blob client like ssl parameters required to connect to the blob server */ + private final Configuration blobClientConfig; + + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + + // + + /** +* Job reference counters with a time-to-live (TTL). +*/ + @VisibleForTesting + static class RefCount { + /** +* Number of references to a job. +*/ + public int references = 0; + + /** +* Timestamp in milliseconds when any job data should be cleaned up (no cleanup for +* non-positive values). +*/ + public long keepUntil = -1; + } + + /** Map to store the number of references to a specific job */ + private final MapjobRefCounters = new HashMap<>(); + + /** Time interval (ms) to run the cleanup task; also used
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182898#comment-16182898 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r141406045 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param requiredBlob +* @param blobKey * blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { + checkArgument(blobKey != null, "BLOB key cannot be null."); - final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob); + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); - if (localFile.exists()) { + try { + getFileInternal(jobId, blobKey, highlyAvailable, localFile); return localFile; + } finally { + readWriteLock.readLock().unlock(); } - else { + } + + /** +* Helper to retrieve the local path of a file associated with a job and a blob key. +* +* The blob server looks the blob key up in its local storage. If the file exists, it is +* returned. If the file does not exist, it is retrieved from the HA blob store (if available) +* or a {@link FileNotFoundException} is thrown. +* +* Assumes the read lock has already been acquired. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param blobKey +* blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) +* @param localFile +* (local) file where the blob is/should be stored +* +* @throws IOException +* Thrown if the file retrieval failed. +*/ + void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException { + // assume readWriteLock.readLock() was already locked (cannot really check that) + + if (localFile.exists()) { + return; + } else if (highlyAvailable) { + // Try the HA blob store + // first we have to release the read lock in order to acquire the write lock + readWriteLock.readLock().unlock(); + + // use a temporary file (thread-safe without locking) + File incomingFile = null; try { - // Try the blob store - blobStore.get(jobId, requiredBlob, localFile); + incomingFile = createTemporaryFilename(); + blobStore.get(jobId, blobKey, incomingFile); + + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); --- End diff -- Alright. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r141406045 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param requiredBlob +* @param blobKey * blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { + checkArgument(blobKey != null, "BLOB key cannot be null."); - final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob); + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); - if (localFile.exists()) { + try { + getFileInternal(jobId, blobKey, highlyAvailable, localFile); return localFile; + } finally { + readWriteLock.readLock().unlock(); } - else { + } + + /** +* Helper to retrieve the local path of a file associated with a job and a blob key. +* +* The blob server looks the blob key up in its local storage. If the file exists, it is +* returned. If the file does not exist, it is retrieved from the HA blob store (if available) +* or a {@link FileNotFoundException} is thrown. +* +* Assumes the read lock has already been acquired. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param blobKey +* blob key associated with the requested file +* @param highlyAvailable +* whether to the requested file is highly available (HA) +* @param localFile +* (local) file where the blob is/should be stored +* +* @throws IOException +* Thrown if the file retrieval failed. +*/ + void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException { + // assume readWriteLock.readLock() was already locked (cannot really check that) + + if (localFile.exists()) { + return; + } else if (highlyAvailable) { + // Try the HA blob store + // first we have to release the read lock in order to acquire the write lock + readWriteLock.readLock().unlock(); + + // use a temporary file (thread-safe without locking) + File incomingFile = null; try { - // Try the blob store - blobStore.get(jobId, requiredBlob, localFile); + incomingFile = createTemporaryFilename(); + blobStore.get(jobId, blobKey, incomingFile); + + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); --- End diff -- Alright. ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182897#comment-16182897 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r141405909 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws IOException { // start the server thread setName("BLOB Server listener at " + getPort()); setDaemon(true); - start(); --- End diff -- Makes sense. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r141405909 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws IOException { // start the server thread setName("BLOB Server listener at " + getPort()); setDaemon(true); - start(); --- End diff -- Makes sense. ---
[jira] [Commented] (FLINK-7411) minor performance improvements in NettyMessage
[ https://issues.apache.org/jira/browse/FLINK-7411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182879#comment-16182879 ] ASF GitHub Bot commented on FLINK-7411: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4517 Thanks for your contribution @NicoK and the review @pnowojski. Changes look good to me. Will wait for Travis and then merge this PR. > minor performance improvements in NettyMessage > -- > > Key: FLINK-7411 > URL: https://issues.apache.org/jira/browse/FLINK-7411 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > > {{NettyMessage}} may be improved slightly performance-wise in these regards: > - in {{NettyMessage.NettyMessageDecoder#decode()}}: instead of having > multiple if-elseif-... use a switch to cycle through the message ID > - use a static {{NettyMessage}} subtype {{readFrom(ByteBuf buffer)}} - we do > not really need to have a virtual function here -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4517: [FLINK-7411][network] minor performance improvements in N...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4517 Thanks for your contribution @NicoK and the review @pnowojski. Changes look good to me. Will wait for Travis and then merge this PR. ---
[jira] [Commented] (FLINK-7514) fix BackPressureStatsTrackerITCase releasing buffers twice
[ https://issues.apache.org/jira/browse/FLINK-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182858#comment-16182858 ] ASF GitHub Bot commented on FLINK-7514: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4591 Thanks for your contribution @NicoK and the reviews @pnowojski and @zentol. Changes look good. If Travis passes, then I'll merge the PR. > fix BackPressureStatsTrackerITCase releasing buffers twice > -- > > Key: FLINK-7514 > URL: https://issues.apache.org/jira/browse/FLINK-7514 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{BackPressureStatsTrackerITCase#testBackPressuredProducer()}} is releasing > its buffers twice which should be fixed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4591: [FLINK-7514][tests] fix BackPressureStatsTrackerITCase re...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4591 Thanks for your contribution @NicoK and the reviews @pnowojski and @zentol. Changes look good. If Travis passes, then I'll merge the PR. ---
[jira] [Commented] (FLINK-7513) remove TestBufferFactory#MOCK_BUFFER
[ https://issues.apache.org/jira/browse/FLINK-7513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182855#comment-16182855 ] ASF GitHub Bot commented on FLINK-7513: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4590 Thanks for your contribution @NicoK and the review @pnowojski. Merging this PR. > remove TestBufferFactory#MOCK_BUFFER > > > Key: FLINK-7513 > URL: https://issues.apache.org/jira/browse/FLINK-7513 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{TestBufferFactory#MOCK_BUFFER}} is a static buffer in tests and did not > allow proper reference counting and we should rather create test buffers in > the tests which may also be released afterwards. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4590: [FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4590 Thanks for your contribution @NicoK and the review @pnowojski. Merging this PR. ---
[jira] [Commented] (FLINK-7699) Define the BufferListener interface to replace EventListener in BufferProvider
[ https://issues.apache.org/jira/browse/FLINK-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182854#comment-16182854 ] ASF GitHub Bot commented on FLINK-7699: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4735#discussion_r141400676 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java --- @@ -211,10 +210,15 @@ public void testSetLessThanRequiredNumBuffers() throws IOException { @Test public void testPendingRequestWithListenerAfterRecycle() throws Exception { - EventListener listener = spy(new EventListener() { + BufferListener listener = spy(new BufferListener() { @Override - public void onEvent(Buffer buffer) { + public boolean notifyBufferAvailable(Buffer buffer) { buffer.recycle(); + return false; --- End diff -- Yes, I also consider the issue of verifying notification multi times. And I guess you may point out it. :) It can not work to return true for that, and I will think of proper way of verifying it. > Define the BufferListener interface to replace EventListener in BufferProvider > -- > > Key: FLINK-7699 > URL: https://issues.apache.org/jira/browse/FLINK-7699 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the {{EventListener}} is used in {{BufferProvider}} to be notified > of buffer available or destroyed pool. > To be semantic clearly, we define a new {{BufferListener}} interface which > can opt for a one-time only notification or to be notified repeatedly. And we > can also notify the pool destroyed via explicitly method > {{notifyBufferDestroyed}}. > The {{RemoteInputChannel}} will implement the {{BufferListener}} interface to > wait for floating buffers from {{BufferProvider}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4735#discussion_r141400676 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java --- @@ -211,10 +210,15 @@ public void testSetLessThanRequiredNumBuffers() throws IOException { @Test public void testPendingRequestWithListenerAfterRecycle() throws Exception { - EventListener listener = spy(new EventListener() { + BufferListener listener = spy(new BufferListener() { @Override - public void onEvent(Buffer buffer) { + public boolean notifyBufferAvailable(Buffer buffer) { buffer.recycle(); + return false; --- End diff -- Yes, I also consider the issue of verifying notification multi times. And I guess you may point out it. :) It can not work to return true for that, and I will think of proper way of verifying it. ---
[jira] [Commented] (FLINK-7487) test instability in ClassLoaderITCase (no resources available)
[ https://issues.apache.org/jira/browse/FLINK-7487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182850#comment-16182850 ] ASF GitHub Bot commented on FLINK-7487: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4571 Thanks for your contribution @NicoK. Merging this PR. > test instability in ClassLoaderITCase (no resources available) > -- > > Key: FLINK-7487 > URL: https://issues.apache.org/jira/browse/FLINK-7487 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > > This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 > which contains quite some changes but the error itself should be unrelated: > {code} > testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase) > Time elapsed: 0.604 sec <<< ERROR! > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #0 (Map (Map at > main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with > groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < > SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, > f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, > 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, > 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, > 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number > of instances=2, total number of slots=4, available slots=0 > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261) > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138) > at > org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362) > at > org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596) > at > org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at >
[GitHub] flink issue #4571: [FLINK-7487][tests] fix ClassLoaderITCase#testDisposeSave...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4571 Thanks for your contribution @NicoK. Merging this PR. ---
[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4735#discussion_r141398906 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -87,6 +88,12 @@ /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); + /** The number of unsent buffers in the producer's sub partition. */ + private final AtomicInteger senderBacklog = new AtomicInteger(0); + + /** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */ + private final AtomicBoolean isWaitingFloatingBuffers = new AtomicBoolean(false); --- End diff -- These two fields are currently used in `notifyBufferAvailable()` logic, so we have to define them in this PR. In next PR #4509 , `isWaitingForFloatingBuffers` field will be set true by two conditions: 1. The number of current available buffers is less than sender backlog 2. There are no available floating buffers in `BufferProvider` And this field also used to avoid register listener in `BufferProvider` multi times. It may confuse you to only see this PR change, sorry for that. ---
[jira] [Commented] (FLINK-7699) Define the BufferListener interface to replace EventListener in BufferProvider
[ https://issues.apache.org/jira/browse/FLINK-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182849#comment-16182849 ] ASF GitHub Bot commented on FLINK-7699: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4735#discussion_r141398906 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -87,6 +88,12 @@ /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); + /** The number of unsent buffers in the producer's sub partition. */ + private final AtomicInteger senderBacklog = new AtomicInteger(0); + + /** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */ + private final AtomicBoolean isWaitingFloatingBuffers = new AtomicBoolean(false); --- End diff -- These two fields are currently used in `notifyBufferAvailable()` logic, so we have to define them in this PR. In next PR #4509 , `isWaitingForFloatingBuffers` field will be set true by two conditions: 1. The number of current available buffers is less than sender backlog 2. There are no available floating buffers in `BufferProvider` And this field also used to avoid register listener in `BufferProvider` multi times. It may confuse you to only see this PR change, sorry for that. > Define the BufferListener interface to replace EventListener in BufferProvider > -- > > Key: FLINK-7699 > URL: https://issues.apache.org/jira/browse/FLINK-7699 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the {{EventListener}} is used in {{BufferProvider}} to be notified > of buffer available or destroyed pool. > To be semantic clearly, we define a new {{BufferListener}} interface which > can opt for a one-time only notification or to be notified repeatedly. And we > can also notify the pool destroyed via explicitly method > {{notifyBufferDestroyed}}. > The {{RemoteInputChannel}} will implement the {{BufferListener}} interface to > wait for floating buffers from {{BufferProvider}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182844#comment-16182844 ] ASF GitHub Bot commented on FLINK-7695: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4737 [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint ## What is the purpose of the change Add `JobConfigHandler` for the new `RestServerEndpoint`. ## Verifying this change This change added tests and can be verified as follows: - JSON marshalling is tested via `JobConfigInfoTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portJobConfigHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4737.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4737 commit 3aa313561ee238b9074d58024516ebffeafbe3ed Author: Till RohrmannDate: 2017-09-21T08:53:24Z [FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint Let the JobCancellationHandler implement the LegacyRestHandler interface. Moreover, this commit adds the DELETE method to HttpMethodWrapper and the RestServerEndpoint#registerHandler method. Add PATCH method This closes #4697. commit 1fa13dc89ffebd2b3f286ce4cb9cf4ac5350bc79 Author: Till Rohrmann Date: 2017-09-21T14:47:18Z [FLINK-7649] [flip6] Extend JobTerminationHandler to support stop Rename the JobCancellationHandler into JobTerminationHandler which is now responsible for terminating jobs. Moreover, this commits adds two termination modes, cancel and stop, which are specified by providing a query parameter. This closes #4700. commit 82eeeb95a943ccf71692f0b228027b0163d19def Author: Till Rohrmann Date: 2017-09-22T11:31:12Z [FLINK-7667] [flip6] Use ArchivedExecutionGraph as serializable AccessExecutionGraph This commit removes AccessExecutionGraph#getCheckpointCoordinator and changes the AccessExecutionGraph#getJobCheckpointSettings into #getJobCheckpointConfiguration. The JobCheckpointConfiguration only contains the CheckpointCoordinator relevant configuration settings and excludes the serialized state backend and the serialized master hooks. That way we don't send unnecessary information when the ArchivedExecutionGraph is requested. This closes #4727. commit 67ed81f7d1cc7e1e150bc2abd9f037e75a05b8ff Author: Till Rohrmann Date: 2017-09-25T13:29:59Z [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after a given time to live period. This will trigger requesting the AccessExecutionGraph again and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST handlers. In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which have exceeded their time to live. Currently it is set to 20 * refreshInterval of the web gui. This closes #4728. commit 6409de0441d1fd2d3321d6d0eb05db51d91027ea Author: Till Rohrmann Date: 2017-09-26T16:39:15Z [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint > Port JobConfigHandler to new REST endpoint > -- > > Key: FLINK-7695 > URL: https://issues.apache.org/jira/browse/FLINK-7695 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >
[GitHub] flink pull request #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4737 [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint ## What is the purpose of the change Add `JobConfigHandler` for the new `RestServerEndpoint`. ## Verifying this change This change added tests and can be verified as follows: - JSON marshalling is tested via `JobConfigInfoTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portJobConfigHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4737.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4737 commit 3aa313561ee238b9074d58024516ebffeafbe3ed Author: Till RohrmannDate: 2017-09-21T08:53:24Z [FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint Let the JobCancellationHandler implement the LegacyRestHandler interface. Moreover, this commit adds the DELETE method to HttpMethodWrapper and the RestServerEndpoint#registerHandler method. Add PATCH method This closes #4697. commit 1fa13dc89ffebd2b3f286ce4cb9cf4ac5350bc79 Author: Till Rohrmann Date: 2017-09-21T14:47:18Z [FLINK-7649] [flip6] Extend JobTerminationHandler to support stop Rename the JobCancellationHandler into JobTerminationHandler which is now responsible for terminating jobs. Moreover, this commits adds two termination modes, cancel and stop, which are specified by providing a query parameter. This closes #4700. commit 82eeeb95a943ccf71692f0b228027b0163d19def Author: Till Rohrmann Date: 2017-09-22T11:31:12Z [FLINK-7667] [flip6] Use ArchivedExecutionGraph as serializable AccessExecutionGraph This commit removes AccessExecutionGraph#getCheckpointCoordinator and changes the AccessExecutionGraph#getJobCheckpointSettings into #getJobCheckpointConfiguration. The JobCheckpointConfiguration only contains the CheckpointCoordinator relevant configuration settings and excludes the serialized state backend and the serialized master hooks. That way we don't send unnecessary information when the ArchivedExecutionGraph is requested. This closes #4727. commit 67ed81f7d1cc7e1e150bc2abd9f037e75a05b8ff Author: Till Rohrmann Date: 2017-09-25T13:29:59Z [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after a given time to live period. This will trigger requesting the AccessExecutionGraph again and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST handlers. In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which have exceeded their time to live. Currently it is set to 20 * refreshInterval of the web gui. This closes #4728. commit 6409de0441d1fd2d3321d6d0eb05db51d91027ea Author: Till Rohrmann Date: 2017-09-26T16:39:15Z [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint ---
[GitHub] flink issue #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribute to wo...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4710 Looks good to me. +1 ---
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182840#comment-16182840 ] ASF GitHub Bot commented on FLINK-7446: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4710 Looks good to me. +1 > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141396863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference channelError = new AtomicReference<>(); + + /** +* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* while data is still coming in for this channel. +*/ + private final ConcurrentMap cancelled = Maps.newConcurrentMap(); + + private volatile ChannelHandlerContext ctx; + + // + // Input channel/receiver registration + // + + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); + + if (!inputChannels.containsKey(listener.getInputChannelId())) { + inputChannels.put(listener.getInputChannelId(), listener); + } + } + + void removeInputChannel(RemoteInputChannel listener) { + inputChannels.remove(listener.getInputChannelId()); + } + + void cancelRequestFor(InputChannelID inputChannelId) { + if (inputChannelId == null || ctx == null) { + return; + } + + if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) { + ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId)); + } + } + + // + // Network events + // + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + if (this.ctx == null) { + this.ctx = ctx; + } + + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { +
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182835#comment-16182835 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141396863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference channelError = new AtomicReference<>(); + + /** +* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* while data is still coming in for this channel. +*/ + private final ConcurrentMap cancelled = Maps.newConcurrentMap(); + + private volatile ChannelHandlerContext ctx; + + // + // Input channel/receiver registration + // + + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); + + if (!inputChannels.containsKey(listener.getInputChannelId())) { + inputChannels.put(listener.getInputChannelId(), listener); + } + } + + void removeInputChannel(RemoteInputChannel listener) { + inputChannels.remove(listener.getInputChannelId()); + } + + void cancelRequestFor(InputChannelID inputChannelId) { + if (inputChannelId == null || ctx == null) { + return; + } + + if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) { + ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId)); + } + } + + // + // Network events + // + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + if (this.ctx ==
[jira] [Commented] (FLINK-7667) Add serializable AccessExecutionGraph implementation
[ https://issues.apache.org/jira/browse/FLINK-7667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182831#comment-16182831 ] ASF GitHub Bot commented on FLINK-7667: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4727 Thanks for the review @zentol. Rebasing this commit and waiting on Travis. > Add serializable AccessExecutionGraph implementation > > > Key: FLINK-7667 > URL: https://issues.apache.org/jira/browse/FLINK-7667 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to decouple the REST endpoint from the {{JobMaster}} we should have > an "offline" implementation of the {{AccessExecutionGraph}} which is > serializable and can be set to remote peers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182830#comment-16182830 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141396295 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference channelError = new AtomicReference<>(); + + /** +* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* while data is still coming in for this channel. +*/ + private final ConcurrentMap cancelled = Maps.newConcurrentMap(); + + private volatile ChannelHandlerContext ctx; + + // + // Input channel/receiver registration + // + + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); + + if (!inputChannels.containsKey(listener.getInputChannelId())) { + inputChannels.put(listener.getInputChannelId(), listener); + } + } + + void removeInputChannel(RemoteInputChannel listener) { + inputChannels.remove(listener.getInputChannelId()); + } + + void cancelRequestFor(InputChannelID inputChannelId) { + if (inputChannelId == null || ctx == null) { + return; + } + + if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) { + ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId)); + } + } + + // + // Network events + // + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + if (this.ctx ==
[GitHub] flink issue #4727: [FLINK-7667] [flip6] Use ArchivedExecutionGraph as serial...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4727 Thanks for the review @zentol. Rebasing this commit and waiting on Travis. ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141396295 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference channelError = new AtomicReference<>(); + + /** +* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* while data is still coming in for this channel. +*/ + private final ConcurrentMap cancelled = Maps.newConcurrentMap(); + + private volatile ChannelHandlerContext ctx; + + // + // Input channel/receiver registration + // + + void addInputChannel(RemoteInputChannel listener) throws IOException { + checkError(); + + if (!inputChannels.containsKey(listener.getInputChannelId())) { + inputChannels.put(listener.getInputChannelId(), listener); + } + } + + void removeInputChannel(RemoteInputChannel listener) { + inputChannels.remove(listener.getInputChannelId()); + } + + void cancelRequestFor(InputChannelID inputChannelId) { + if (inputChannelId == null || ctx == null) { + return; + } + + if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) { + ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId)); + } + } + + // + // Network events + // + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + if (this.ctx == null) { + this.ctx = ctx; + } + + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { +
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182828#comment-16182828 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141396023 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference channelError = new AtomicReference<>(); + + /** +* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* while data is still coming in for this channel. +*/ + private final ConcurrentMap cancelled = Maps.newConcurrentMap(); --- End diff -- I saw, but let's sneak this change in ;) (`CreditBasedClientHandler` is a new file and and although I know that it will eventually replace `PartitionRequestClientHandler`, I'd like it to not keep too much legacy stuff) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from >
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141396023 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; +import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; +import org.apache.flink.runtime.io.network.netty.exception.TransportException; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + + private final ConcurrentMapinputChannels = new ConcurrentHashMap<>(); + + private final AtomicReference channelError = new AtomicReference<>(); + + /** +* Set of cancelled partition requests. A request is cancelled iff an input channel is cleared +* while data is still coming in for this channel. +*/ + private final ConcurrentMap cancelled = Maps.newConcurrentMap(); --- End diff -- I saw, but let's sneak this change in ;) (`CreditBasedClientHandler` is a new file and and although I know that it will eventually replace `PartitionRequestClientHandler`, I'd like it to not keep too much legacy stuff) ---
[jira] [Assigned] (FLINK-7717) Port TaskManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7717: --- Assignee: Bowen Li > Port TaskManagerMetricsHandler to new REST endpoint > --- > > Key: FLINK-7717 > URL: https://issues.apache.org/jira/browse/FLINK-7717 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Bowen Li > Labels: flip-6 > Fix For: 1.4.0 > > > Port {{TaskManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7649) Port JobStoppingHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182827#comment-16182827 ] ASF GitHub Bot commented on FLINK-7649: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4700 Thanks for the review @zentol. Rebasing onto the latest #4697 version. > Port JobStoppingHandler to new REST endpoint > > > Key: FLINK-7649 > URL: https://issues.apache.org/jira/browse/FLINK-7649 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobStoppingHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)