[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180227#comment-16180227 ] Bowen Li commented on FLINK-7648: - [~till.rohrmann] [~tzulitai] I'd like to try this and contribute to the new server architecture. Given it's a handler for TaskManager, I guess I chose a pretty hard one... :) Can you please give me some guidance on this ticket, e.g. - how to start and run the new web portal? - what's the expected way to test the endpoint? - more Thanks! > Port TaskManagersHandler to new REST endpoint > - > > Key: FLINK-7648 > URL: https://issues.apache.org/jira/browse/FLINK-7648 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Bowen Li > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{TaskManagersHandler}} to the new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r140951770 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.WindowJoinUtil +import scala.collection.JavaConverters._ + +class DataStreamJoinRule + extends ConverterRule( +classOf[FlinkLogicalJoin], +FlinkConventions.LOGICAL, +FlinkConventions.DATASTREAM, +"DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] +val joinInfo = join.analyzeCondition + +val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate( + joinInfo.getRemaining(join.getCluster.getRexBuilder), + join.getLeft.getRowType.getFieldCount, + join.getRowType, + join.getCluster.getRexBuilder, + TableConfig.DEFAULT) + +// remaining predicate must not access time attributes +val remainingPredsAccessTime = remainingPreds.isDefined && + WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType) + +// Check that no event-time attributes are in the input. +val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala + .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +if (!windowBounds.isDefined && !remainingPredsAccessTime && !rowTimeAttrInOutput) { --- End diff -- @fhueske, we actually agree quite a lot on the concern of infinite size you have raised. The same problem does not only exist in joining, but also in other cases, for example GROUPBY, where the grouping-key and associated state can be unlimited in terms of the size that the state of Flink can not hold them all. IMO, there is not an easy way to completely eliminate this just through the validation of query planner/optimizer, so I think it is not a good idea to only allow the unbounded-joining after a certain operators, like non-windowed aggregation (in fact, as mentioned above, the grouping-key of aggregation may also be infinite, so this does not ensure the finite state for joining operator). On the other hand, I think the finite state can only be ensured by the users by giving some hints/controls. We need instruct users to properly set those control knobs, such that their jobs will not run out of space. One hint we currently have is state ttl. (I think @hequn8128 has already added this for this unbounded joining). Maybe here we can add a check on state ttl to force users set a proper value. What do you think? ---
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180188#comment-16180188 ] ASF GitHub Bot commented on FLINK-6094: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r140951770 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.WindowJoinUtil +import scala.collection.JavaConverters._ + +class DataStreamJoinRule + extends ConverterRule( +classOf[FlinkLogicalJoin], +FlinkConventions.LOGICAL, +FlinkConventions.DATASTREAM, +"DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] +val joinInfo = join.analyzeCondition + +val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate( + joinInfo.getRemaining(join.getCluster.getRexBuilder), + join.getLeft.getRowType.getFieldCount, + join.getRowType, + join.getCluster.getRexBuilder, + TableConfig.DEFAULT) + +// remaining predicate must not access time attributes +val remainingPredsAccessTime = remainingPreds.isDefined && + WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType) + +// Check that no event-time attributes are in the input. +val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala + .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +if (!windowBounds.isDefined && !remainingPredsAccessTime && !rowTimeAttrInOutput) { --- End diff -- @fhueske, we actually agree quite a lot on the concern of infinite size you have raised. The same problem does not only exist in joining, but also in other cases, for example GROUPBY, where the grouping-key and associated state can be unlimited in terms of the size that the state of Flink can not hold them all. IMO, there is not an easy way to completely eliminate this just through the validation of query planner/optimizer, so I think it is not a good idea to only allow the unbounded-joining after a certain operators, like non-windowed aggregation (in fact, as mentioned above, the grouping-key of aggregation may also be infinite, so this does not ensure the finite state for joining operator). On the other hand, I think the finite state can only be ensured by the users by giving some hints/controls. We need instruct users to properly set those control knobs, such that their jobs will not run out of space. One hint we currently have is state ttl. (I think @hequn8128 has already added this for this unbounded joining). Maybe here we can add a check on state ttl to force users set a proper value. What do you think? > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn
[jira] [Assigned] (FLINK-5176) Properly wait for component shutdown in Runner components
[ https://issues.apache.org/jira/browse/FLINK-5176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-5176: - Assignee: Shuyi Chen > Properly wait for component shutdown in Runner components > - > > Key: FLINK-5176 > URL: https://issues.apache.org/jira/browse/FLINK-5176 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Minor > Labels: flip-6 > > Currently, the different runners for the Flip-6 components {{JobMaster}}, > {{ResourceManager}} and {{TaskExecutor}} don't wait until the component has > been shut down in their own {{shutDown}} method. The problem is that the > {{AkkaRpcService}} implements the {{RpcService#shutDown}} method via a > {{PoisonPill}} which is not executed synchronously. Instead we should obtain > the {{RpcEndpoint#getTerminationFutures}} and wait on the returned future > (with a given timeout). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180077#comment-16180077 ] Xingcan Cui commented on FLINK-7548: Thanks for the answers, [~fhueske]. Some extra comments. About the first question, we must admit that a rowtime field is dualistic. On one hand, it represents the rowtime and should be taken as the {{Long}} type. On the other hand, it is a common field that gets its own type ({{Timestamp}} or {{Long}} and maybe more in the future?). We don't want to perform the extra type judgement when using this field as the rowtime field and also don't want to lose the original data type when using it as a common field (e.g., be passed to a UDF which formats a timestamp). Of course, if all the types are internally represented as {{Long}}, we just give the fields different time indicators so that we could recover the original data type after the processing. The record number bounded out-of-order generation strategy is something like we don't emit a watermark {{w}} until a specified number of records whose timestamps are greater than {{w}} have reached. Just an idea that hits me :P About my last question, I actually refer to the {{TimestampsAndPeriodicWatermarksOperator}}. Here, the "periodic" refers to proctime. Considering the time systems for the rowtime and the proctime may not be synchronized (i.e., they get different speeds), could we consider providing a "rowtime periodic" assigner? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7648) Port TaskManagersHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7648: --- Assignee: Bowen Li > Port TaskManagersHandler to new REST endpoint > - > > Key: FLINK-7648 > URL: https://issues.apache.org/jira/browse/FLINK-7648 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Bowen Li > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{TaskManagersHandler}} to the new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7688) ContaineredTaskManagerParameters sets off heap memory size incorrectly unlimited
Bill Liu created FLINK-7688: --- Summary: ContaineredTaskManagerParameters sets off heap memory size incorrectly unlimited Key: FLINK-7688 URL: https://issues.apache.org/jira/browse/FLINK-7688 Project: Flink Issue Type: Bug Components: YARN Reporter: Bill Liu When taskmanager.memory.off-heap is disabled, the heap size is set to the jvm size, which makes the off heap size to be -1 (unlimited). As a result YARN occasionally kills the container. {code:java} final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : javaMemorySizeMB - heapSizeMB; {code} It'e better set a limit for direct memory no matter off-heap enabled or not. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179870#comment-16179870 ] Fabian Hueske commented on FLINK-7657: -- Hi [~kmurra], thanks for looking into this. I'm currently traveling but [~twalthr] knows the SQL / Table API type system very well (better than I do) and might be able to help. > SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException > -- > > Key: FLINK-7657 > URL: https://issues.apache.org/jira/browse/FLINK-7657 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.3.2 >Reporter: Kent Murra >Assignee: Kent Murra >Priority: Critical > > I have a SQL statement using the Tables API that has a timestamp in it. When > the execution environment tries to optimize the SQL, it causes an exception > (attached below). The result is any SQL query with a timestamp, date, or > time literal is unexecutable if any table source is marked with > FilterableTableSource. > {code:none} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule PushFilterIntoTableSourceScanRule, args > [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, > $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], > fields:(data, last_updated))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328) > at > org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135) > at org.apache.flink.table.api.Table.writeToSink(table.scala:800) > at org.apache.flink.table.api.Table.writeToSink(table.scala:773) > at > com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27) > at > com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) > at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) > Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot > be cast to java.util.Date > at > org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107) > at > org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35) > at > org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92) > at > org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at
[jira] [Created] (FLINK-7687) Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts
Elias Levy created FLINK-7687: - Summary: Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts Key: FLINK-7687 URL: https://issues.apache.org/jira/browse/FLINK-7687 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.3.2 Reporter: Elias Levy Priority: Minor It would be helpful if the documentation was clearer on the fact that the master/slaves config files are not needed when configured in high-availability mode unless you are using the provided scripts to start and shutdown the cluster over SSH. If you are using some other mechanism to manage Flink instances (configuration management tools such as Chef or Ansible, or container management frameworks like Docker Compose or Kubernetes), these files are unnecessary. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179842#comment-16179842 ] ASF GitHub Bot commented on FLINK-7683: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140908693 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend( LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + @Override + public Stream getKeys(String field, N namespace) { + Tuple2columnInfo = kvStateInformation.get(field); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterator sourceIterator = new Iterator() { + @Override + public boolean hasNext() { + return iterator.isValid(); + } + + @Override + public K next() { + try { + byte[] key = iterator.key(); + + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); + K value = keySerializer.deserialize(dataInput); + iterator.next(); --- End diff -- This will throw exceptions implicitly if `iterator` doesn't have any more elements. I'd rather have it fail early and faster by checking `iterator.hasNext()` first and throwing exceptions explicitly. > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140908693 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend( LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + @Override + public Stream getKeys(String field, N namespace) { + Tuple2columnInfo = kvStateInformation.get(field); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterator sourceIterator = new Iterator() { + @Override + public boolean hasNext() { + return iterator.isValid(); + } + + @Override + public K next() { + try { + byte[] key = iterator.key(); + + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); + K value = keySerializer.deserialize(dataInput); + iterator.next(); --- End diff -- This will throw exceptions implicitly if `iterator` doesn't have any more elements. I'd rather have it fail early and faster by checking `iterator.hasNext()` first and throwing exceptions explicitly. ---
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179836#comment-16179836 ] Fabian Hueske commented on FLINK-7548: -- Thanks for your thoughts [~xccui]. I added a few comments to your suggestions / questions. Thanks, Fabian bq. Considering that the data type should be preserved, it may bring extra logic if we do that internally. To keep the consistency, I wonder if it's possible to encapsulate the time into a new Rowtime type. It exposes two methods, getTime(): Long for logical level use and getValue(): T for physical level use. In fact, {{Long}} and {{Timestamp}} have the same internal representation, namely {{Long}}. The issue is more the type that is exposed to SQL or the Table API. We would need a new TimeIndicator type that exposes a timestamp as {{Long}}. bq. Besides, I think the watermark generation should not be bound with rowtime extraction. Compared with implementing them in a single scan operator (not sure if I understood correctly), I prefer to generate watermarks in extra operators. That should be more flexible. Timestamp extraction and watermark generation would not be tight together. First, we would compute timestamps (only necessary if we don't use an existing field). The next step would extract watermarks. However, both operations would happen in the logical scan operator because a single operator can be translated into multiple DataStream operations. bq. I am thinking of a new record number bounded out-of-order generation strategy. Do you think it will be useful in real applications? How would this strategy work? IMO, built-in strategies should have a concrete use case in mind which is common enough to justify a built-in primitive. bq. I still feel that the machine time is not compatible with the rowtime watermark generation. Shall we consider getting rid of it? Machine time (assuming that you refer to processing time here) does not use watermarks. Watermarks are only used for event-time processing. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4688 ---
[jira] [Closed] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7638. Resolution: Fixed Fix Version/s: 1.4.0 Added via e585aed8ce751d769b56054fc1ffd4be24350e91 > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179840#comment-16179840 ] ASF GitHub Bot commented on FLINK-7638: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4688 > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140907699 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend( LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + @Override + public Stream getKeys(String field, N namespace) { + Tuple2columnInfo = kvStateInformation.get(field); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterator sourceIterator = new Iterator() { + @Override + public boolean hasNext() { --- End diff -- Is there a lock in RocksDB that will guard the iteration against state change by users? ---
[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179835#comment-16179835 ] ASF GitHub Bot commented on FLINK-7683: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140907699 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend( LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + @Override + public Stream getKeys(String field, N namespace) { + Tuple2columnInfo = kvStateInformation.get(field); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterator sourceIterator = new Iterator() { + @Override + public boolean hasNext() { --- End diff -- Is there a lock in RocksDB that will guard the iteration against state change by users? > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179832#comment-16179832 ] ASF GitHub Bot commented on FLINK-7683: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140907101 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend( LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + @Override + public Stream getKeys(String field, N namespace) { + Tuple2columnInfo = kvStateInformation.get(field); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterator sourceIterator = new Iterator() { --- End diff -- I believe it's better to move impl of this iterator to a standalone or inner class > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140907101 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend( LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + @Override + public Stream getKeys(String field, N namespace) { + Tuple2columnInfo = kvStateInformation.get(field); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterator sourceIterator = new Iterator() { --- End diff -- I believe it's better to move impl of this iterator to a standalone or inner class ---
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4665 LGTM ---
[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179607#comment-16179607 ] David Anderson commented on FLINK-7683: --- Something like this has also been requested by users managing state for very large numbers of keys, who would rather occasionally iterate over all their state and clear expired items, rather than having a timer per key. Will that become possible? > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7686) Add Flink Forward Berlin 2017 conference slides to the flink website
Hai Zhou_UTC+8 created FLINK-7686: - Summary: Add Flink Forward Berlin 2017 conference slides to the flink website Key: FLINK-7686 URL: https://issues.apache.org/jira/browse/FLINK-7686 Project: Flink Issue Type: Wish Reporter: Hai Zhou_UTC+8 Priority: Trivial I recently watched [Flink Forward Berlin 2017|https://berlin.flink-forward.org/sessions/] conference slides, the content is very good. I think we should add them to the [flink website|http://flink.apache.org/community.html] for more people to know. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7645) Modify system-metrics part show in the document
[ https://issues.apache.org/jira/browse/FLINK-7645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179486#comment-16179486 ] ASF GitHub Bot commented on FLINK-7645: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/4693 Hey @yew1eb, I like this I think it really makes sense to make this browsable from the table of contents. @zentol What do you think? I would be in favour of merging this if you don't have any objections. > Modify system-metrics part show in the document > --- > > Key: FLINK-7645 > URL: https://issues.apache.org/jira/browse/FLINK-7645 > Project: Flink > Issue Type: Improvement > Components: Documentation, Metrics >Reporter: Hai Zhou_UTC+8 > Fix For: 1.4.0 > > > the system-metrics contents structure: > Currently, > {noformat} > ├── System metrics > ├── Latency tracking > ├── Dashboard integration > {noformat} > I think the following is more reasonable, > {noformat} > ├── System metrics >├── CPU >├── Memory >├── Threads >├── GarbageCollection >├── ClassLoader >├── Network >├── Cluster >├── Availability >├── Checkpointing >├── IO >└── Connectors > └── Kafka Connectors > ├── Latency tracking > ├── Dashboard integration > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4693: [FLINK-7645][docs] Modify system-metrics part show in the...
Github user uce commented on the issue: https://github.com/apache/flink/pull/4693 Hey @yew1eb, I like this ð I think it really makes sense to make this browsable from the table of contents. @zentol What do you think? I would be in favour of merging this if you don't have any objections. ---
[jira] [Commented] (FLINK-7393) Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest
[ https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179325#comment-16179325 ] ASF GitHub Bot commented on FLINK-7393: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4708 @tzulitai yes, only unit tests migration > Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to > KinesisConfigUtilTest > --- > > Key: FLINK-7393 > URL: https://issues.apache.org/jira/browse/FLINK-7393 > Project: Flink > Issue Type: Test > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Right now, > [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java] > has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. > all the {{validateXxxConfiguration()}} > We need to move those tests out to a new file {{KinesisConfigUtilTest}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4708: [FLINK-7393][kinesis connector] Move unit tests that shou...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4708 @tzulitai yes, only unit tests migration ---
[GitHub] flink issue #4702: [FLINK-7635][DataStream API][Scala API] Support sideOutpu...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4702 Thank you, guys @aljoscha @chenqin ! ---
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179317#comment-16179317 ] ASF GitHub Bot commented on FLINK-7635: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4702 Thank you, guys @aljoscha @chenqin ! > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li > Fix For: 1.4.0 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179318#comment-16179318 ] ASF GitHub Bot commented on FLINK-7635: --- Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4702 > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li > Fix For: 1.4.0 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4702 ---
[GitHub] flink pull request #4707: [hotfix] Add doc for window operators, fix typos a...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4707 ---
[jira] [Commented] (FLINK-5944) Flink should support reading Snappy Files
[ https://issues.apache.org/jira/browse/FLINK-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179308#comment-16179308 ] ASF GitHub Bot commented on FLINK-5944: --- Github user mlipkovich commented on a diff in the pull request: https://github.com/apache/flink/pull/4683#discussion_r140833786 --- Diff: flink-core/pom.xml --- @@ -52,6 +52,12 @@ under the License. flink-shaded-asm + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + --- End diff -- Yes, it is a good point to make Hadoop Snappy a default codec. I think we still could support a Xerial Snappy since it comes for free. I will do these changes once we agree on dependencies Regarding to separate module what would be the content of this model? As I understand a user which would like to read HDFS files will need flink-java module anyway since it contains Hadoop wrappers like HadoopInputSplit and so on. How do you think if it makes sense to put this Hadoop codec there? > Flink should support reading Snappy Files > - > > Key: FLINK-5944 > URL: https://issues.apache.org/jira/browse/FLINK-5944 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats >Reporter: Ilya Ganelin >Assignee: Mikhail Lipkovich > Labels: features > > Snappy is an extremely performant compression format that's widely used > offering fast decompression/compression. > This can be easily implemented by creating a SnappyInflaterInputStreamFactory > and updating the initDefaultInflateInputStreamFactories in FileInputFormat. > Flink already includes the Snappy dependency in the project. > There is a minor gotcha in this. If we wish to use this with Hadoop, then we > must provide two separate implementations since Hadoop uses a different > version of the snappy format than Snappy Java (which is the xerial/snappy > included in Flink). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140831859 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -281,6 +286,73 @@ public void testProducerFailedException() throws Exception { ch.getNextBuffer(); } + /** +* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is +* recycled to available buffers directly and it triggers notify of announced credit. +*/ + @Test + public void testRecycleExclusiveBufferBeforeReleased() throws Exception { + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + + // Recycle exclusive segment + inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024, inputChannel)); + + assertEquals("There should have one available buffer after recycle.", --- End diff -- `There should be one buffer available after recycle` ---
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179305#comment-16179305 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140833417 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -281,6 +286,73 @@ public void testProducerFailedException() throws Exception { ch.getNextBuffer(); } + /** +* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is +* recycled to available buffers directly and it triggers notify of announced credit. +*/ + @Test + public void testRecycleExclusiveBufferBeforeReleased() throws Exception { + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + + // Recycle exclusive segment + inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024, inputChannel)); + + assertEquals("There should have one available buffer after recycle.", + 1, inputChannel.getNumberOfAvailableBuffers()); + verify(inputChannel, times(1)).notifyCreditAvailable(); --- End diff -- can you add one more `inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024, inputChannel))` call and verify `inputChannel.getNumberOfAvailableBuffers()` and that `notifyCreditAvailable()` is not called again? > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179306#comment-16179306 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140831859 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -281,6 +286,73 @@ public void testProducerFailedException() throws Exception { ch.getNextBuffer(); } + /** +* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is +* recycled to available buffers directly and it triggers notify of announced credit. +*/ + @Test + public void testRecycleExclusiveBufferBeforeReleased() throws Exception { + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + + // Recycle exclusive segment + inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024, inputChannel)); + + assertEquals("There should have one available buffer after recycle.", --- End diff -- `There should be one buffer available after recycle` > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4683: [FLINK-5944] Support reading of Snappy files
Github user mlipkovich commented on a diff in the pull request: https://github.com/apache/flink/pull/4683#discussion_r140833786 --- Diff: flink-core/pom.xml --- @@ -52,6 +52,12 @@ under the License. flink-shaded-asm + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + --- End diff -- Yes, it is a good point to make Hadoop Snappy a default codec. I think we still could support a Xerial Snappy since it comes for free. I will do these changes once we agree on dependencies Regarding to separate module what would be the content of this model? As I understand a user which would like to read HDFS files will need flink-java module anyway since it contains Hadoop wrappers like HadoopInputSplit and so on. How do you think if it makes sense to put this Hadoop codec there? ---
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140833417 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -281,6 +286,73 @@ public void testProducerFailedException() throws Exception { ch.getNextBuffer(); } + /** +* Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is +* recycled to available buffers directly and it triggers notify of announced credit. +*/ + @Test + public void testRecycleExclusiveBufferBeforeReleased() throws Exception { + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + + // Recycle exclusive segment + inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024, inputChannel)); + + assertEquals("There should have one available buffer after recycle.", + 1, inputChannel.getNumberOfAvailableBuffers()); + verify(inputChannel, times(1)).notifyCreditAvailable(); --- End diff -- can you add one more `inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024, inputChannel))` call and verify `inputChannel.getNumberOfAvailableBuffers()` and that `notifyCreditAvailable()` is not called again? ---
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179304#comment-16179304 ] ASF GitHub Bot commented on FLINK-7465: --- Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140833539 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Java implementation of HyperLogLog (HLL) algorithm from this paper: + * + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + * + * HLL is an improved version of LogLog that is capable of estimating + * the cardinality of a set with accuracy = 1.04/sqrt(m) where + * m = 2^b. So we can control accuracy vs space usage by increasing + * or decreasing b. + * + * The main benefit of using HLL over LL is that it only requires 64% + * of the space that LL does to get the same accuracy. + * + * + * Note that this implementation does not include the long range correction function + * defined in the original paper. Empirical evidence shows that the correction + * function causes more harm than good. + * + */ +public class HyperLogLog implements ICardinality, Serializable { + + private final RegisterSet registerSet; + private final int log2m; + private final double alphaMM; + + + /** +* Create a new HyperLogLog instance using the specified standard deviation. +* +* @param rsd - the relative standard deviation for the counter. +*smaller values create counters that require more space. +*/ + public HyperLogLog(double rsd) { + this(log2m(rsd)); + } + + private static int log2m(double rsd) { + return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2)); + } + + private static double rsd(int log2m) { + return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2))); + } + + private static void validateLog2m(int log2m) { + if (log2m < 0 || log2m > 30) { + throw new IllegalArgumentException("log2m argument is " + + log2m + " and is outside the range [0, 30]"); + } + } + + private static double linearCounting(int m, double v) { + return m * Math.log(m / v); + } + + /** +* Create a new HyperLogLog instance. The log2m parameter defines the accuracy of +* the counter. The larger the log2m the better the accuracy. +* +* accuracy = 1.04/sqrt(2^log2m) +* +* @param log2m - the number of bits to use as the basis for the HLL instance +*/ + public HyperLogLog(int log2m) { + this(log2m, new RegisterSet(1 << log2m)); + } + + /** +* Creates a new HyperLogLog instance using the given registers. Used for unmarshalling a serialized +* instance and for merging multiple counters together. +* +* @param registerSet - the initial values for the register set +*/ + public HyperLogLog(int log2m, RegisterSet registerSet) { + validateLog2m(log2m); + this.registerSet = registerSet; + this.log2m = log2m; + int
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140833539 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Java implementation of HyperLogLog (HLL) algorithm from this paper: + * + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + * + * HLL is an improved version of LogLog that is capable of estimating + * the cardinality of a set with accuracy = 1.04/sqrt(m) where + * m = 2^b. So we can control accuracy vs space usage by increasing + * or decreasing b. + * + * The main benefit of using HLL over LL is that it only requires 64% + * of the space that LL does to get the same accuracy. + * + * + * Note that this implementation does not include the long range correction function + * defined in the original paper. Empirical evidence shows that the correction + * function causes more harm than good. + * + */ +public class HyperLogLog implements ICardinality, Serializable { + + private final RegisterSet registerSet; + private final int log2m; + private final double alphaMM; + + + /** +* Create a new HyperLogLog instance using the specified standard deviation. +* +* @param rsd - the relative standard deviation for the counter. +*smaller values create counters that require more space. +*/ + public HyperLogLog(double rsd) { + this(log2m(rsd)); + } + + private static int log2m(double rsd) { + return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2)); + } + + private static double rsd(int log2m) { + return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2))); + } + + private static void validateLog2m(int log2m) { + if (log2m < 0 || log2m > 30) { + throw new IllegalArgumentException("log2m argument is " + + log2m + " and is outside the range [0, 30]"); + } + } + + private static double linearCounting(int m, double v) { + return m * Math.log(m / v); + } + + /** +* Create a new HyperLogLog instance. The log2m parameter defines the accuracy of +* the counter. The larger the log2m the better the accuracy. +* +* accuracy = 1.04/sqrt(2^log2m) +* +* @param log2m - the number of bits to use as the basis for the HLL instance +*/ + public HyperLogLog(int log2m) { + this(log2m, new RegisterSet(1 << log2m)); + } + + /** +* Creates a new HyperLogLog instance using the given registers. Used for unmarshalling a serialized +* instance and for merging multiple counters together. +* +* @param registerSet - the initial values for the register set +*/ + public HyperLogLog(int log2m, RegisterSet registerSet) { + validateLog2m(log2m); + this.registerSet = registerSet; + this.log2m = log2m; + int m = 1 << this.log2m; + + alphaMM = getAlphaMM(log2m, m); + } + + @Override + public boolean offerHashed(long hashedValue) { + // j becomes the binary address determined by the first b
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179299#comment-16179299 ] ASF GitHub Bot commented on FLINK-7465: --- Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140832701 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Java implementation of HyperLogLog (HLL) algorithm from this paper: + * + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + * + * HLL is an improved version of LogLog that is capable of estimating + * the cardinality of a set with accuracy = 1.04/sqrt(m) where + * m = 2^b. So we can control accuracy vs space usage by increasing + * or decreasing b. + * + * The main benefit of using HLL over LL is that it only requires 64% + * of the space that LL does to get the same accuracy. + * + * + * Note that this implementation does not include the long range correction function + * defined in the original paper. Empirical evidence shows that the correction + * function causes more harm than good. + * + */ +public class HyperLogLog implements ICardinality, Serializable { --- End diff -- I see this class is adapted from https://github.com/addthis/stream-lib. I think you should comment that this class was adapted from the link, so people can track differences. > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: >
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140832701 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Java implementation of HyperLogLog (HLL) algorithm from this paper: + * + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + * + * HLL is an improved version of LogLog that is capable of estimating + * the cardinality of a set with accuracy = 1.04/sqrt(m) where + * m = 2^b. So we can control accuracy vs space usage by increasing + * or decreasing b. + * + * The main benefit of using HLL over LL is that it only requires 64% + * of the space that LL does to get the same accuracy. + * + * + * Note that this implementation does not include the long range correction function + * defined in the original paper. Empirical evidence shows that the correction + * function causes more harm than good. + * + */ +public class HyperLogLog implements ICardinality, Serializable { --- End diff -- I see this class is adapted from https://github.com/addthis/stream-lib. I think you should comment that this class was adapted from the link, so people can track differences. ---
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179290#comment-16179290 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140831302 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -183,18 +214,40 @@ void notifySubpartitionConsumed() { } /** -* Releases all received buffers and closes the partition request client. +* Releases all received and available buffers, closes the partition request client. */ @Override void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { + + final List recyclingSegments = new ArrayList<>(); + synchronized (receivedBuffers) { Buffer buffer; while ((buffer = receivedBuffers.poll()) != null) { - buffer.recycle(); + if (buffer.getRecycler() == this) { + recyclingSegments.add(buffer.getMemorySegment()); --- End diff -- ok - regarding `RemoteInputChannel#recycle()`, if we use this here (indirectly via `buffer.recycle`), we would not want it to redistribute buffers for every single `recycle()` call, but if `RemoteInputChannel#recycle()` is called for a single buffer outside `RemoteInputChannel#releaseAllResources()`, we would want it to do that... You're right that this would create some mess there and we should probably keep your current solution unless something better comes to mind. > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179289#comment-16179289 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140829015 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -215,6 +269,48 @@ public String toString() { } // + // Credit-based + // + + /** +* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +*/ + void notifyCreditAvailable() { + //TODO in next PR + } + + /** +* Exclusive buffer is recycled to this input channel directly and it may trigger notify +* credit to producer. +* +* @param segment The exclusive segment of this channel. +*/ + @Override + public void recycle(MemorySegment segment) { + synchronized (availableBuffers) { + if (isReleased.get()) { + try { + inputGate.returnExclusiveSegments(Arrays.asList(segment)); + return; + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } --- End diff -- Can you maybe add a comment on the importance of the `isReleased` check being inside the synchronized block (as implemented by `onBuffer` before, but also without a comment)? This is related to the `AtomicBoolean` field and not getting into races here. > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140831302 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -183,18 +214,40 @@ void notifySubpartitionConsumed() { } /** -* Releases all received buffers and closes the partition request client. +* Releases all received and available buffers, closes the partition request client. */ @Override void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { + + final List recyclingSegments = new ArrayList<>(); + synchronized (receivedBuffers) { Buffer buffer; while ((buffer = receivedBuffers.poll()) != null) { - buffer.recycle(); + if (buffer.getRecycler() == this) { + recyclingSegments.add(buffer.getMemorySegment()); --- End diff -- ok - regarding `RemoteInputChannel#recycle()`, if we use this here (indirectly via `buffer.recycle`), we would not want it to redistribute buffers for every single `recycle()` call, but if `RemoteInputChannel#recycle()` is called for a single buffer outside `RemoteInputChannel#releaseAllResources()`, we would want it to do that... You're right that this would create some mess there and we should probably keep your current solution unless something better comes to mind. ---
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140829015 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -215,6 +269,48 @@ public String toString() { } // + // Credit-based + // + + /** +* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +*/ + void notifyCreditAvailable() { + //TODO in next PR + } + + /** +* Exclusive buffer is recycled to this input channel directly and it may trigger notify +* credit to producer. +* +* @param segment The exclusive segment of this channel. +*/ + @Override + public void recycle(MemorySegment segment) { + synchronized (availableBuffers) { + if (isReleased.get()) { + try { + inputGate.returnExclusiveSegments(Arrays.asList(segment)); + return; + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } --- End diff -- Can you maybe add a comment on the importance of the `isReleased` check being inside the synchronized block (as implemented by `onBuffer` before, but also without a comment)? This is related to the `AtomicBoolean` field and not getting into races here. ---
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179284#comment-16179284 ] ASF GitHub Bot commented on FLINK-7465: --- Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140830364 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.IOException; + +/** + * An interface definition for implementation of cardinality. + */ +public interface ICardinality { --- End diff -- Why is the interface named `ICardinlaity`? Is this common in the Flink codebase? I know prefixing "I" is common in C#. > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140830364 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.IOException; + +/** + * An interface definition for implementation of cardinality. + */ +public interface ICardinality { --- End diff -- Why is the interface named `ICardinlaity`? Is this common in the Flink codebase? I know prefixing "I" is common in C#. ---
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140828795 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.IOException; + +/** + * An interface definition for implementation of cardinality. + */ +public interface ICardinality { + + /** +* Check whether the element is impact estimate. +* +* @param o stream element +* @return false if the value returned by cardinality() is unaffected by the appearance of o in the stream. +*/ + boolean offer(Object o); + + /** +* Offer the value as a hashed long value. +* +* @param hashedLong - the hash of the item to offer to the estimator +* @return false if the value returned by cardinality() is unaffected by the appearance of hashedLong in the stream +*/ + boolean offerHashed(long hashedLong); + + /** +* Offer the value as a hashed long value. +* +* @param hashedInt - the hash of the item to offer to the estimator +* @return false if the value returned by cardinality() is unaffected by the appearance of hashedInt in the stream +*/ + boolean offerHashed(int hashedInt); + + /** +* @return the number of unique elements in the stream or an estimate thereof. +*/ + long cardinality(); + + /** +* @return size in bytes needed for serialization. +*/ + int sizeof(); + + /** +* Get the byte array used for the calculation. +* +* @return The byte array used for the calculation +* @throws IOException +*/ + byte[] getBytes() throws IOException; + + /** +* Merges estimators to produce a new estimator for the combined streams +* of this estimator and those passed as arguments. +* +* Nor this estimator nor the one passed as parameters are modified. +* +* @param estimators Zero or more compatible estimators +* @throws Exception If at least one of the estimators is not compatible with this one +*/ + ICardinality merge(ICardinality... estimators) throws Exception; --- End diff -- Wouldn't it be nicer to have a more specific Exception? ---
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179276#comment-16179276 ] ASF GitHub Bot commented on FLINK-7465: --- Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140828795 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/ICardinality.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.IOException; + +/** + * An interface definition for implementation of cardinality. + */ +public interface ICardinality { + + /** +* Check whether the element is impact estimate. +* +* @param o stream element +* @return false if the value returned by cardinality() is unaffected by the appearance of o in the stream. +*/ + boolean offer(Object o); + + /** +* Offer the value as a hashed long value. +* +* @param hashedLong - the hash of the item to offer to the estimator +* @return false if the value returned by cardinality() is unaffected by the appearance of hashedLong in the stream +*/ + boolean offerHashed(long hashedLong); + + /** +* Offer the value as a hashed long value. +* +* @param hashedInt - the hash of the item to offer to the estimator +* @return false if the value returned by cardinality() is unaffected by the appearance of hashedInt in the stream +*/ + boolean offerHashed(int hashedInt); + + /** +* @return the number of unique elements in the stream or an estimate thereof. +*/ + long cardinality(); + + /** +* @return size in bytes needed for serialization. +*/ + int sizeof(); + + /** +* Get the byte array used for the calculation. +* +* @return The byte array used for the calculation +* @throws IOException +*/ + byte[] getBytes() throws IOException; + + /** +* Merges estimators to produce a new estimator for the combined streams +* of this estimator and those passed as arguments. +* +* Nor this estimator nor the one passed as parameters are modified. +* +* @param estimators Zero or more compatible estimators +* @throws Exception If at least one of the estimators is not compatible with this one +*/ + ICardinality merge(ICardinality... estimators) throws Exception; --- End diff -- Wouldn't it be nicer to have a more specific Exception? > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions.
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179259#comment-16179259 ] ASF GitHub Bot commented on FLINK-7465: --- Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140826158 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/MurmurHash.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +/** + * This is a very fast, non-cryptographic hash suitable for general hash-based + * lookup. See http://murmurhash.googlepages.com/ for more details. + * + */ +public class MurmurHash { --- End diff -- What was the decision in choosing MurmurHash2 (in https://github.com/tnm/murmurhash-java) versus MurmurHash3 (in https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Murmur3_32HashFunction.java)? If I recall correctly, MurmurHash3 is more collision resistant and slightly faster than MurmurHash2. > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140826158 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/MurmurHash.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +/** + * This is a very fast, non-cryptographic hash suitable for general hash-based + * lookup. See http://murmurhash.googlepages.com/ for more details. + * + */ +public class MurmurHash { --- End diff -- What was the decision in choosing MurmurHash2 (in https://github.com/tnm/murmurhash-java) versus MurmurHash3 (in https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Murmur3_32HashFunction.java)? If I recall correctly, MurmurHash3 is more collision resistant and slightly faster than MurmurHash2. ---
[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179245#comment-16179245 ] ASF GitHub Bot commented on FLINK-6465: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4556#discussion_r140824914 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1406,6 +1406,61 @@ object AggregateUtil { } } } +case sqlFirstLastValueAggFunction: SqlFirstLastValueAggFunction => + + aggregates(index) = if (sqlFirstLastValueAggFunction.getKind == SqlKind.FIRST_VALUE) { +if (needRetraction) { + sqlTypeName match { --- End diff -- Convert it to a map > support FIRST_VALUE on Table API & SQL > -- > > Key: FLINK-6465 > URL: https://issues.apache.org/jira/browse/FLINK-6465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: sunjincheng > > {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add > {{FIRST_VALUE}} function support on TableAPI & SQL. > *Syntax:* > FIRST_VALUE ( [scalar_expression ] ) > OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] ) > [About OVER > WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows] > scalar_expression > Is the value to be returned. scalar_expression can be a column, or other > arbitrary expression that results in a single value. Other analytic functions > are not permitted. > *NOTE:* > * {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: > {code} > SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab > {code} > * OVER WINDOW's retraction is expensive(currently not supported yet), and > this JIRA. does not implement Retract logic of {{FIRST_VALUE}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4720: Release 1.3.2 rc3
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4720 This looks pretty much like some sort of mistake. Can you please close the PR? ---
[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179246#comment-16179246 ] ASF GitHub Bot commented on FLINK-6465: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4556#discussion_r140824819 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunctionWithRetract.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.dataview.{ListView, MapView} +import org.apache.flink.table.functions.AggregateFunction + +/** The initial accumulator for first value with retraction aggregate function */ +class FirstValueWithRetractAccumulator[T] { + var data: ListView[T] = _ + var retractedData: MapView[T, Byte] = _ + + override def equals(obj: scala.Any): Boolean = { +if (obj.isInstanceOf[FirstValueWithRetractAccumulator[T]]) { + val target = obj.asInstanceOf[FirstValueWithRetractAccumulator[T]] + if (target.data.equals(this.data) && target.retractedData.equals(this.retractedData)) { +return true + } +} +false + } +} + +/** + * Base class for built-in first value with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class FirstValueWithRetractAggFunction[T] + extends AggregateFunction[T, FirstValueWithRetractAccumulator[T]] { + + override def createAccumulator(): FirstValueWithRetractAccumulator[T] = { +val acc = new FirstValueWithRetractAccumulator[T] +acc.data = new ListView[T] +acc.retractedData = new MapView[T, Byte] +acc + } + + def accumulate(acc: FirstValueWithRetractAccumulator[T], value: Any): Unit = { +if (null != value) { + val v = value.asInstanceOf[T] + acc.data.add(v) +} + } + + def retract(acc: FirstValueWithRetractAccumulator[T], value: Any): Unit = { --- End diff -- It seems that we can do better than having two copies of state here. It might make sense to extend the ListView interface or to introduce a TreeMap like view. > support FIRST_VALUE on Table API & SQL > -- > > Key: FLINK-6465 > URL: https://issues.apache.org/jira/browse/FLINK-6465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: sunjincheng > > {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add > {{FIRST_VALUE}} function support on TableAPI & SQL. > *Syntax:* > FIRST_VALUE ( [scalar_expression ] ) > OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] ) > [About OVER > WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows] > scalar_expression > Is the value to be returned. scalar_expression can be a column, or other > arbitrary expression that results in a single value. Other analytic functions > are not permitted. > *NOTE:* > * {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: > {code} > SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab > {code} > * OVER WINDOW's retraction is expensive(currently not supported yet), and > this JIRA. does not implement Retract logic of {{FIRST_VALUE}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179244#comment-16179244 ] ASF GitHub Bot commented on FLINK-6465: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4556#discussion_r140823719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunction.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +/** + * The initial accumulator for first value aggregate function + * + * @tparam T the type for the aggregation result + */ +class FirstValueAccumulator[T] extends JTuple1[T] + +abstract class FirstValueAggFunction[T] extends AggregateFunction[T, FirstValueAccumulator[T]] { + + override def createAccumulator(): FirstValueAccumulator[T] = { +val acc = new FirstValueAccumulator[T] +acc.f0 = getInitValue +acc + } + + def accumulate(acc: FirstValueAccumulator[T], value: Any): Unit = { +if (null == acc.f0) { + acc.f0 = value.asInstanceOf[T] +} + } + + override def getValue(acc: FirstValueAccumulator[T]): T = { +acc.f0 + } + + def resetAccumulator(acc: FirstValueAccumulator[T]): Unit = { +acc.f0 = getInitValue + } + + def getInitValue: T = { +null.asInstanceOf[T] + } + + def getValueTypeInfo: TypeInformation[T] --- End diff -- Looks like the parameter can be a generic parameter > support FIRST_VALUE on Table API & SQL > -- > > Key: FLINK-6465 > URL: https://issues.apache.org/jira/browse/FLINK-6465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: sunjincheng > > {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add > {{FIRST_VALUE}} function support on TableAPI & SQL. > *Syntax:* > FIRST_VALUE ( [scalar_expression ] ) > OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] ) > [About OVER > WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows] > scalar_expression > Is the value to be returned. scalar_expression can be a column, or other > arbitrary expression that results in a single value. Other analytic functions > are not permitted. > *NOTE:* > * {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: > {code} > SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab > {code} > * OVER WINDOW's retraction is expensive(currently not supported yet), and > this JIRA. does not implement Retract logic of {{FIRST_VALUE}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4556: [FLINK-6465][table]support FIRST_VALUE on Table AP...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4556#discussion_r140824914 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1406,6 +1406,61 @@ object AggregateUtil { } } } +case sqlFirstLastValueAggFunction: SqlFirstLastValueAggFunction => + + aggregates(index) = if (sqlFirstLastValueAggFunction.getKind == SqlKind.FIRST_VALUE) { +if (needRetraction) { + sqlTypeName match { --- End diff -- Convert it to a map ---
[GitHub] flink pull request #4556: [FLINK-6465][table]support FIRST_VALUE on Table AP...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4556#discussion_r140824819 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunctionWithRetract.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.dataview.{ListView, MapView} +import org.apache.flink.table.functions.AggregateFunction + +/** The initial accumulator for first value with retraction aggregate function */ +class FirstValueWithRetractAccumulator[T] { + var data: ListView[T] = _ + var retractedData: MapView[T, Byte] = _ + + override def equals(obj: scala.Any): Boolean = { +if (obj.isInstanceOf[FirstValueWithRetractAccumulator[T]]) { + val target = obj.asInstanceOf[FirstValueWithRetractAccumulator[T]] + if (target.data.equals(this.data) && target.retractedData.equals(this.retractedData)) { +return true + } +} +false + } +} + +/** + * Base class for built-in first value with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class FirstValueWithRetractAggFunction[T] + extends AggregateFunction[T, FirstValueWithRetractAccumulator[T]] { + + override def createAccumulator(): FirstValueWithRetractAccumulator[T] = { +val acc = new FirstValueWithRetractAccumulator[T] +acc.data = new ListView[T] +acc.retractedData = new MapView[T, Byte] +acc + } + + def accumulate(acc: FirstValueWithRetractAccumulator[T], value: Any): Unit = { +if (null != value) { + val v = value.asInstanceOf[T] + acc.data.add(v) +} + } + + def retract(acc: FirstValueWithRetractAccumulator[T], value: Any): Unit = { --- End diff -- It seems that we can do better than having two copies of state here. It might make sense to extend the ListView interface or to introduce a TreeMap like view. ---
[GitHub] flink pull request #4556: [FLINK-6465][table]support FIRST_VALUE on Table AP...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4556#discussion_r140823719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/FirstValueAggFunction.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.functions.AggregateFunction + +/** + * The initial accumulator for first value aggregate function + * + * @tparam T the type for the aggregation result + */ +class FirstValueAccumulator[T] extends JTuple1[T] + +abstract class FirstValueAggFunction[T] extends AggregateFunction[T, FirstValueAccumulator[T]] { + + override def createAccumulator(): FirstValueAccumulator[T] = { +val acc = new FirstValueAccumulator[T] +acc.f0 = getInitValue +acc + } + + def accumulate(acc: FirstValueAccumulator[T], value: Any): Unit = { +if (null == acc.f0) { + acc.f0 = value.asInstanceOf[T] +} + } + + override def getValue(acc: FirstValueAccumulator[T]): T = { +acc.f0 + } + + def resetAccumulator(acc: FirstValueAccumulator[T]): Unit = { +acc.f0 = getInitValue + } + + def getInitValue: T = { +null.asInstanceOf[T] + } + + def getValueTypeInfo: TypeInformation[T] --- End diff -- Looks like the parameter can be a generic parameter ---
[jira] [Commented] (FLINK-7541) Redistribute operator state using OperatorID
[ https://issues.apache.org/jira/browse/FLINK-7541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179241#comment-16179241 ] ASF GitHub Bot commented on FLINK-7541: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4609 Merged in f1b2b83d63. Please close the PR and the JIRA. > Redistribute operator state using OperatorID > > > Key: FLINK-7541 > URL: https://issues.apache.org/jira/browse/FLINK-7541 > Project: Flink > Issue Type: Improvement >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Currently StateAssignmentOperation relays heavily on the order of new and old > operators in the task. It should be changed and it should relay more on > OperatorID. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4609: [FLINK-7541] Refactor StateAssignmentOperation
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4609 Merged in f1b2b83d63. Please close the PR and the JIRA. ---
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140823213 --- Diff: docs/dev/table/sql.md --- @@ -2020,7 +2020,16 @@ COUNT(*) Returns the number of input rows. - + + +{% highlight text %} +CARDINALITY_COUNT(rsd, value) --- End diff -- Would it be clearer to the user to have the function have the word "approximate" in it such that the user understands the count is an estimate? I see Apache Spark calls it `approx_count_distinct`(https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html#approx_count_distinct-org.apache.spark.sql.Column-double-) and Redshift has it as `APPROXIMATE COUNT(DISTINCT column)` (http://docs.aws.amazon.com/redshift/latest/dg/r_COUNT.html). ---
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179237#comment-16179237 ] ASF GitHub Bot commented on FLINK-7465: --- Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140823213 --- Diff: docs/dev/table/sql.md --- @@ -2020,7 +2020,16 @@ COUNT(*) Returns the number of input rows. - + + +{% highlight text %} +CARDINALITY_COUNT(rsd, value) --- End diff -- Would it be clearer to the user to have the function have the word "approximate" in it such that the user understands the count is an estimate? I see Apache Spark calls it `approx_count_distinct`(https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html#approx_count_distinct-org.apache.spark.sql.Column-double-) and Redshift has it as `APPROXIMATE COUNT(DISTINCT column)` (http://docs.aws.amazon.com/redshift/latest/dg/r_COUNT.html). > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179234#comment-16179234 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140822508 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -183,18 +214,40 @@ void notifySubpartitionConsumed() { } /** -* Releases all received buffers and closes the partition request client. +* Releases all received and available buffers, closes the partition request client. */ @Override void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { + + final List recyclingSegments = new ArrayList<>(); + synchronized (receivedBuffers) { Buffer buffer; while ((buffer = receivedBuffers.poll()) != null) { - buffer.recycle(); + if (buffer.getRecycler() == this) { + recyclingSegments.add(buffer.getMemorySegment()); --- End diff -- oh, now I see - but let me elaborate on the `RemoteInputChannel#recycle()` there > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140822508 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -183,18 +214,40 @@ void notifySubpartitionConsumed() { } /** -* Releases all received buffers and closes the partition request client. +* Releases all received and available buffers, closes the partition request client. */ @Override void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { + + final List recyclingSegments = new ArrayList<>(); + synchronized (receivedBuffers) { Buffer buffer; while ((buffer = receivedBuffers.poll()) != null) { - buffer.recycle(); + if (buffer.getRecycler() == this) { + recyclingSegments.add(buffer.getMemorySegment()); --- End diff -- oh, now I see - but let me elaborate on the `RemoteInputChannel#recycle()` there ---
[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179228#comment-16179228 ] ASF GitHub Bot commented on FLINK-7683: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140821446 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java --- @@ -287,6 +289,14 @@ public S get(K key, N namespace) { } @Override + public Stream getKeys(N namespace) { + return Arrays.stream(primaryTable) --- End diff -- This hash table applies incremental rehashing. This method must also consider the data stored ``incrementalRehashTable`` if it isn't ``null`` or we can miss some data. > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179223#comment-16179223 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140810853 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -99,8 +114,24 @@ public RemoteInputChannel( this.connectionManager = checkNotNull(connectionManager); } + /** +* Assigns exclusive buffers to this input channel, and this method should be called only once +* after this input channel is created. +*/ void assignExclusiveSegments(List segments) { - // TODO in next PR + checkState(this.initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have" + + "already been set for this input channel."); --- End diff -- please add a space between `have` and `already` (between the concatenations) > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179224#comment-16179224 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140821090 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -183,18 +214,40 @@ void notifySubpartitionConsumed() { } /** -* Releases all received buffers and closes the partition request client. +* Releases all received and available buffers, closes the partition request client. */ @Override void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { + + final List recyclingSegments = new ArrayList<>(); + synchronized (receivedBuffers) { Buffer buffer; while ((buffer = receivedBuffers.poll()) != null) { - buffer.recycle(); + if (buffer.getRecycler() == this) { + recyclingSegments.add(buffer.getMemorySegment()); --- End diff -- 1) I think, performance is not much of an issue here, as this is only called during take-down of a connection and the overhead of `Buffer#recycle` is actually not that much. 2) Sorry, but I don't get your concern. Why would you need an extra check when using `buffer.recycle()` instead of `exclusiveRecyclingSegments.add(buffer.getMemorySegment())`? There shouldn't be anything special for the exclusive buffers in this regard compared to ordinary buffers (which is the beauty of the design). Let me give the example of how `LocalBufferPool` handles this inside `lazyDestroy`: it returns every memory segment (one by one) with `networkBufferPool.recycle()` and, at its end, it is calling `networkBufferPool.destroyBufferPool()` so that the book-keeping inside the `NetworkBufferPool` is updated and buffers may be re-distributed to other `LocalBufferPool` instances. We could do this similarly here: recycle one by one, then call some method to update book-keeping and balancing inside `NetworkBufferPool`. > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179225#comment-16179225 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140811033 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -100,7 +122,16 @@ public RemoteInputChannel( } void assignExclusiveSegments(List segments) { --- End diff -- works for me :) > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r140821446 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java --- @@ -287,6 +289,14 @@ public S get(K key, N namespace) { } @Override + public Stream getKeys(N namespace) { + return Arrays.stream(primaryTable) --- End diff -- This hash table applies incremental rehashing. This method must also consider the data stored ``incrementalRehashTable`` if it isn't ``null`` or we can miss some data. ---
[jira] [Commented] (FLINK-7394) Manage exclusive buffers in RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179226#comment-16179226 ] ASF GitHub Bot commented on FLINK-7394: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140809272 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -70,6 +78,15 @@ */ private int expectedSequenceNumber = 0; + /** The initial number of exclusive buffers assigned to this channel. */ + private int initialCredit; + + /** The current available buffers including both exclusive buffers and requested floating buffers. */ + private final ArrayDeque availableBuffers = new ArrayDeque<>(); + + /** The number of available buffers that have not been unannounced to producer yet. */ --- End diff -- actually, there were some more typos here - the full comment should be something like this `The number of available buffers that have not been announced to the producer yet.` > Manage exclusive buffers in RemoteInputChannel > -- > > Key: FLINK-7394 > URL: https://issues.apache.org/jira/browse/FLINK-7394 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The basic works are: > * Exclusive buffers are assigned to {{RemoteInputChannel}} after created by > {{SingleInputGate}}. > * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage > the exclusive buffers itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140811033 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -100,7 +122,16 @@ public RemoteInputChannel( } void assignExclusiveSegments(List segments) { --- End diff -- works for me :) ---
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140809272 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -70,6 +78,15 @@ */ private int expectedSequenceNumber = 0; + /** The initial number of exclusive buffers assigned to this channel. */ + private int initialCredit; + + /** The current available buffers including both exclusive buffers and requested floating buffers. */ + private final ArrayDeque availableBuffers = new ArrayDeque<>(); + + /** The number of available buffers that have not been unannounced to producer yet. */ --- End diff -- actually, there were some more typos here - the full comment should be something like this `The number of available buffers that have not been announced to the producer yet.` ---
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140810853 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -99,8 +114,24 @@ public RemoteInputChannel( this.connectionManager = checkNotNull(connectionManager); } + /** +* Assigns exclusive buffers to this input channel, and this method should be called only once +* after this input channel is created. +*/ void assignExclusiveSegments(List segments) { - // TODO in next PR + checkState(this.initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have" + + "already been set for this input channel."); --- End diff -- please add a space between `have` and `already` (between the concatenations) ---
[GitHub] flink pull request #4499: [FLINK-7394][core] Manage exclusive buffers in Rem...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4499#discussion_r140821090 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -183,18 +214,40 @@ void notifySubpartitionConsumed() { } /** -* Releases all received buffers and closes the partition request client. +* Releases all received and available buffers, closes the partition request client. */ @Override void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { + + final List recyclingSegments = new ArrayList<>(); + synchronized (receivedBuffers) { Buffer buffer; while ((buffer = receivedBuffers.poll()) != null) { - buffer.recycle(); + if (buffer.getRecycler() == this) { + recyclingSegments.add(buffer.getMemorySegment()); --- End diff -- 1) I think, performance is not much of an issue here, as this is only called during take-down of a connection and the overhead of `Buffer#recycle` is actually not that much. 2) Sorry, but I don't get your concern. Why would you need an extra check when using `buffer.recycle()` instead of `exclusiveRecyclingSegments.add(buffer.getMemorySegment())`? There shouldn't be anything special for the exclusive buffers in this regard compared to ordinary buffers (which is the beauty of the design). Let me give the example of how `LocalBufferPool` handles this inside `lazyDestroy`: it returns every memory segment (one by one) with `networkBufferPool.recycle()` and, at its end, it is calling `networkBufferPool.destroyBufferPool()` so that the book-keeping inside the `NetworkBufferPool` is updated and buffers may be re-distributed to other `LocalBufferPool` instances. We could do this similarly here: recycle one by one, then call some method to update book-keeping and balancing inside `NetworkBufferPool`. ---
[jira] [Closed] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit
[ https://issues.apache.org/jira/browse/FLINK-7619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-7619. - Resolution: Fixed Fix Version/s: 1.4.0 Merged in 5af463a9c0. > Improve abstraction in AbstractAsyncIOCallable to better fit > > > Key: FLINK-7619 > URL: https://issues.apache.org/jira/browse/FLINK-7619 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Minor > Fix For: 1.4.0 > > > The abstraction of AbstractAsyncIOCallable does not fit to well with todays > needs in checkpointing backends. Originally, backends were assumed to only > open one stream that is managed by the abstraction. In fact, concrete > implementations always extended that in practise. We can redo this in a way > that more resources can be managed by the abstraction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit
[ https://issues.apache.org/jira/browse/FLINK-7619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179200#comment-16179200 ] ASF GitHub Bot commented on FLINK-7619: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/4671 > Improve abstraction in AbstractAsyncIOCallable to better fit > > > Key: FLINK-7619 > URL: https://issues.apache.org/jira/browse/FLINK-7619 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Minor > > The abstraction of AbstractAsyncIOCallable does not fit to well with todays > needs in checkpointing backends. Originally, backends were assumed to only > open one stream that is managed by the abstraction. In fact, concrete > implementations always extended that in practise. We can redo this in a way > that more resources can be managed by the abstraction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit
[ https://issues.apache.org/jira/browse/FLINK-7619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179199#comment-16179199 ] ASF GitHub Bot commented on FLINK-7619: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4671 Merged in 5af463a9c0. > Improve abstraction in AbstractAsyncIOCallable to better fit > > > Key: FLINK-7619 > URL: https://issues.apache.org/jira/browse/FLINK-7619 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Minor > > The abstraction of AbstractAsyncIOCallable does not fit to well with todays > needs in checkpointing backends. Originally, backends were assumed to only > open one stream that is managed by the abstraction. In fact, concrete > implementations always extended that in practise. We can redo this in a way > that more resources can be managed by the abstraction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4671: [FLINK-7619] Improve abstraction in AbstractAsyncI...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/4671 ---
[GitHub] flink issue #4671: [FLINK-7619] Improve abstraction in AbstractAsyncIOCallab...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4671 Merged in 5af463a9c0. ---
[jira] [Closed] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method
[ https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-7524. - Assignee: Stefan Richter I still found some small potential for improvement that is merged in 0073204b25. > Task "xxx" did not react to cancelling signal, but is stuck in method > - > > Key: FLINK-7524 > URL: https://issues.apache.org/jira/browse/FLINK-7524 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.4.0 > > > Hi, > I observed the following errors in taskmanager.log > {code:java} > 2017-08-25 17:03:40,141 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(SlidingEventTimeWindows(25920, > 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in > method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > ... > 2017-08-25 17:05:10,139 INFO org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task > 'TriggerWindow(SlidingEventTimeWindows(25920, 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 > seconds, but is stuck in method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) >
[jira] [Commented] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method
[ https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179194#comment-16179194 ] ASF GitHub Bot commented on FLINK-7524: --- Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/4639 > Task "xxx" did not react to cancelling signal, but is stuck in method > - > > Key: FLINK-7524 > URL: https://issues.apache.org/jira/browse/FLINK-7524 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0 > > > Hi, > I observed the following errors in taskmanager.log > {code:java} > 2017-08-25 17:03:40,141 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(SlidingEventTimeWindows(25920, > 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in > method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > ... > 2017-08-25 17:05:10,139 INFO org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task > 'TriggerWindow(SlidingEventTimeWindows(25920, 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 > seconds, but is stuck in method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) >
[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/4639 ---
[jira] [Commented] (FLINK-7684) Avoid multiple data copies in MergingWindowSet
[ https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179162#comment-16179162 ] ASF GitHub Bot commented on FLINK-7684: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4723 [FLINK-7684] Avoid data copies in MergingWindowSet ## What is the purpose of the change Previously MergingWindowSet uses ListState of tuples to persists it's mapping. This is inefficient because this ListState of tuples must be converted to a HashMap on each access. Furthermore, for some cases it might be inefficient to check whether mapping has changed before saving it on state. Fixing those two issues improve session windows [benchmarks](https://github.com/dataArtisans/flink-benchmarks) results by 10 - 20% First commit comes from different PR #4722 ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: **yes** (it changes how WindowOperator is being serialized) - The runtime per-record code paths (performance sensitive): **yes** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4723.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4723 commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f Author: Piotr NowojskiDate: 2017-09-14T10:39:30Z [FLINK-7683] Iterate over keys in KeyedStateBackend commit 20ed7f51399752d620696d9502b61233d47f6bcb Author: Piotr Nowojski Date: 2017-09-25T12:18:52Z [FLINK-7684] Add OptimizationTarget to the ExecutionConfig commit c5505e1252ea5b70cff9cf76ff89d7dc52f45057 Author: Piotr Nowojski Date: 2017-07-06T11:56:13Z [FLINK-7684] Serialize MergingWindowSet to ValueState
[GitHub] flink pull request #4723: [FLINK-7684] Avoid data copies in MergingWindowSet
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4723 [FLINK-7684] Avoid data copies in MergingWindowSet ## What is the purpose of the change Previously MergingWindowSet uses ListState of tuples to persists it's mapping. This is inefficient because this ListState of tuples must be converted to a HashMap on each access. Furthermore, for some cases it might be inefficient to check whether mapping has changed before saving it on state. Fixing those two issues improve session windows [benchmarks](https://github.com/dataArtisans/flink-benchmarks) results by 10 - 20% First commit comes from different PR #4722 ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: **yes** (it changes how WindowOperator is being serialized) - The runtime per-record code paths (performance sensitive): **yes** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4723.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4723 commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f Author: Piotr NowojskiDate: 2017-09-14T10:39:30Z [FLINK-7683] Iterate over keys in KeyedStateBackend commit 20ed7f51399752d620696d9502b61233d47f6bcb Author: Piotr Nowojski Date: 2017-09-25T12:18:52Z [FLINK-7684] Add OptimizationTarget to the ExecutionConfig commit c5505e1252ea5b70cff9cf76ff89d7dc52f45057 Author: Piotr Nowojski Date: 2017-07-06T11:56:13Z [FLINK-7684] Serialize MergingWindowSet to ValueState
[jira] [Updated] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-7683: -- Description: This is required to make possible preserving backward compatibility while changing state definition of a keyed state operator (to do so operator must iterate over all of the existing keys and rewrites them into a new state variable). (was: This is required to make possible preserving backward compatibility while changing state definition of a keyed state operator.) > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator (to do so operator must > iterate over all of the existing keys and rewrites them into a new state > variable). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179116#comment-16179116 ] ASF GitHub Bot commented on FLINK-7683: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4722 [FLINK-7683] Iterate over keys in KeyedStateBackend ## What is the purpose of the change This is required to make possible preserving backward compatibility while changing state definition of a keyed state operator. ## Verifying this change This change added new unit tests to cover new feature. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink keys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4722.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4722 commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f Author: Piotr NowojskiDate: 2017-09-14T10:39:30Z [FLINK-7683] Iterate over keys in KeyedStateBackend > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4722 [FLINK-7683] Iterate over keys in KeyedStateBackend ## What is the purpose of the change This is required to make possible preserving backward compatibility while changing state definition of a keyed state operator. ## Verifying this change This change added new unit tests to cover new feature. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink keys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4722.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4722 commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f Author: Piotr NowojskiDate: 2017-09-14T10:39:30Z [FLINK-7683] Iterate over keys in KeyedStateBackend ---
[jira] [Updated] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend
[ https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-7683: -- Description: This is required to make possible preserving backward compatibility while changing state definition of a keyed state operator. (was: This is required to make possible backward compatibility while changing state definition of a keyed state operator.) > Add method to iterate over all of the existing keys in a statebackend > - > > Key: FLINK-7683 > URL: https://issues.apache.org/jira/browse/FLINK-7683 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > This is required to make possible preserving backward compatibility while > changing state definition of a keyed state operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179089#comment-16179089 ] Kostas Kloudas commented on FLINK-7606: --- Perfect! Thanks a lot! > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7685) CompilerException: "Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly"
Gabor Gevay created FLINK-7685: -- Summary: CompilerException: "Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly" Key: FLINK-7685 URL: https://issues.apache.org/jira/browse/FLINK-7685 Project: Flink Issue Type: Bug Components: Optimizer Reporter: Gabor Gevay Priority: Minor A Flink program which reads an input DataSet, creates 64 new DataSets from it, and writes these to separate files throws the following exception: {code:java} Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: Logic for branching plans (non-tree plans) has an error, and does not track the re-joining of branches correctly. at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:491) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:921) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:86) {code} Here is some code that reproduces it: https://github.com/ggevay/flink/tree/compiler-exception-new Note that it works with less than 64 DataSets. Also note that with more than 64 DataSets it throws {{CompilerException: Cannot currently handle nodes with more than 64 outputs}}, which is at least a clear error msg that helps the user to find a workaround. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7684) Avoid multiple data copies in MergingWindowSet
[ https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-7684: -- Issue Type: Improvement (was: New Feature) > Avoid multiple data copies in MergingWindowSet > -- > > Key: FLINK-7684 > URL: https://issues.apache.org/jira/browse/FLINK-7684 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Currently MergingWindowSet uses ListState of tuples to persists it's mapping. > This is inefficient because this ListState of tuples must be converted to a > HashMap on each access. > Furthermore, for some cases it might be inefficient to check whether mapping > has changed before saving it on state. > Those two issues are causing multiple data copies and constructing multiple > Lists/Maps per each processed element, which is a reason for noticeable > performance issues. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7684) Avoid multiple data copies in MergingWindowSet
[ https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-7684: - Assignee: Piotr Nowojski Affects Version/s: 1.2.0 1.3.0 1.2.1 1.3.1 1.3.2 Component/s: Streaming > Avoid multiple data copies in MergingWindowSet > -- > > Key: FLINK-7684 > URL: https://issues.apache.org/jira/browse/FLINK-7684 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.3.2 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Currently MergingWindowSet uses ListState of tuples to persists it's mapping. > This is inefficient because this ListState of tuples must be converted to a > HashMap on each access. > Furthermore, for some cases it might be inefficient to check whether mapping > has changed before saving it on state. > Those two issues are causing multiple data copies and constructing multiple > Lists/Maps per each processed element, which is a reason for noticeable > performance issues. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179072#comment-16179072 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4710 @haohui We only cast ROWTIME / PROCTIME directly to LONG during runtime, the special types are needed during pre-flight phase and validation. We could not come up with a better solution that ensures that watermarks stay aligned with the rowtime. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7684) Avoid multiple data copies in MergingWindowSet
Piotr Nowojski created FLINK-7684: - Summary: Avoid multiple data copies in MergingWindowSet Key: FLINK-7684 URL: https://issues.apache.org/jira/browse/FLINK-7684 Project: Flink Issue Type: New Feature Reporter: Piotr Nowojski Currently MergingWindowSet uses ListState of tuples to persists it's mapping. This is inefficient because this ListState of tuples must be converted to a HashMap on each access. Furthermore, for some cases it might be inefficient to check whether mapping has changed before saving it on state. Those two issues are causing multiple data copies and constructing multiple Lists/Maps per each processed element, which is a reason for noticeable performance issues. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribute to wo...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4710 @haohui We only cast ROWTIME / PROCTIME directly to LONG during runtime, the special types are needed during pre-flight phase and validation. We could not come up with a better solution that ensures that watermarks stay aligned with the rowtime. ---
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179063#comment-16179063 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140774888 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -255,11 +255,30 @@ abstract class CodeGenerator( generateRowtimeAccess() case TimeIndicatorTypeInfo.PROCTIME_MARKER => // attribute is proctime indicator. -// We use a null literal and generate a timestamp when we need it. +// we use a null literal and generate a timestamp when we need it. generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) case idx => -// regular attribute. Access attribute in input data type. -generateInputAccess(input1, input1Term, idx) +// get type of result field +val outIdx = input1Mapping.indexOf(idx) +val outType = returnType match { + case pt: PojoTypeInfo[_] => pt.getTypeAt(resultFieldNames(outIdx)) + case ct: CompositeType[_] => ct.getTypeAt(outIdx) + case t: TypeInformation[_] => t +} +val inputAccess = generateInputAccess(input1, input1Term, idx) +// Change output type to rowtime indicator +if (FlinkTypeFactory.isRowtimeIndicatorType(outType) && + (inputAccess.resultType == Types.LONG || inputAccess.resultType == Types.SQL_TIMESTAMP)) { + // Hard cast possible because LONG, TIMESTAMP, and ROW_TIMEINDICATOR are internally --- End diff -- `ROWTIME_INDICATOR` > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179065#comment-16179065 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140778116 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala --- @@ -30,10 +30,16 @@ package org.apache.flink.table.sources trait DefinedRowtimeAttribute { /** -* Defines a name of the event-time attribute that represents Flink's -* event-time. Null if no rowtime should be available. +* Defines a name of the event-time attribute that represents Flink's event-time, i.e., an +* attribute that is aligned with the watermarks of the table. --- End diff -- `that is aligned with the watermarks of the underlying DataStream`? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179070#comment-16179070 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140778313 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala --- @@ -30,10 +30,16 @@ package org.apache.flink.table.sources trait DefinedRowtimeAttribute { /** -* Defines a name of the event-time attribute that represents Flink's --- End diff -- Maybe also update the docs of the class? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179071#comment-16179071 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140774786 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -255,11 +255,30 @@ abstract class CodeGenerator( generateRowtimeAccess() case TimeIndicatorTypeInfo.PROCTIME_MARKER => // attribute is proctime indicator. -// We use a null literal and generate a timestamp when we need it. +// we use a null literal and generate a timestamp when we need it. generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) case idx => -// regular attribute. Access attribute in input data type. -generateInputAccess(input1, input1Term, idx) +// get type of result field --- End diff -- add a comment that this is needed for `TableSource`? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179066#comment-16179066 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140771682 --- Diff: docs/dev/table/streaming.md --- @@ -336,27 +336,24 @@ val windowedTable = tEnv ### Event time -Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage. +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage. Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular field of a record in a batch environment. In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)). An event time attribute can be defined either during DataStream-to-Table conversion or by using a TableSource. -The Table API & SQL assume that in both cases timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a `TableSource` with knowledge about the incoming data's characteristics and is hidden from the end user of the API. - - During DataStream-to-Table Conversion -The event time attribute is defined with the `.rowtime` property during schema definition. +The event time attribute is defined with the `.rowtime` property during schema definition. Timestamps and watermarks must have been assigned in the `DataStream` that is converted. --- End diff -- Add a link to `dev/event_timestamps_watermarks` again? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)