[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144836#comment-16144836 ] Xingcan Cui commented on FLINK-7548: Thanks for the issue [~jark]. I'd like to share some ideas about the problem. 1. In principle, each rowtime field should be "guarded" with a set of watermarks. Although we support multiple rowtime fields now, once two streams are connected, their watermarks will be forcibly merged. As a consequence, the initial watermarks may not be used in the following calculations. Shall we consider re-generating them? 2. The current periodic watermark assigner is based on machine time. I'm not sure if it is applicable for rowtime since the rowtime and machine time may not be synchronized. For example, if the stream is sourced from a historical queue, it may feed into the system at a maximum speed, thus the machine time based watermark assigner may not work properly (e.g., we may generate a watermark with 1 hour rowtime span in 5 seconds). How about using a rowtime based periodic assigner with the following framework? {code:java} class WatermarksAssigner(interval: Long) extends AssignerWithPunctuatedWatermarks[Order] { var currentWatermark: Long = 0 override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = { element.rt } override def checkAndGetNextWatermark(lastElement: Order, extractedTimestamp: Long): Watermark = { if (currentWatermark >= lastWatermark + interval) { currentWatermark = currentWatermark + ((extractedTimestamp - lastWatermarks) / interval) * interval new Watermark(currentWatermark) } else { null } } } {code} BTW, I'm quite interested in this issue. Can I take it? Thanks, Xingcan > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7454) update 'Monitoring Current Event Time' section of Flink doc
[ https://issues.apache.org/jira/browse/FLINK-7454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144797#comment-16144797 ] ASF GitHub Bot commented on FLINK-7454: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4547 @zentol does it look good to you? > update 'Monitoring Current Event Time' section of Flink doc > --- > > Key: FLINK-7454 > URL: https://issues.apache.org/jira/browse/FLINK-7454 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > Since FLINK-3427 is done, there's no need to have the following doc in > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html#monitoring-current-event-time > "There are plans (see FLINK-3427) to show the current low watermark for each > operator in the Flink web interface. > Until this feature is implemented the current low watermark for each task can > be accessed through the metrics system." > We can replace it with something like "Low watermarks of each task can be > accessed either from Flink web interface or Flink metric system." -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144795#comment-16144795 ] ASF GitHub Bot commented on FLINK-7366: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4522 can anyone from data artisan take a look at this PR please? > Upgrade kinesis producer library in flink-connector-kinesis > --- > > Key: FLINK-7366 > URL: https://issues.apache.org/jira/browse/FLINK-7366 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > We need to upgrade KPL and KCL to pick up the enhanced performance and > stability for Flink to work better with Kinesis. Upgrading KPL is specially > necessary, because the KPL version Flink uses is old, and doesn't have good > retry and error handling logic. > *KPL:* > flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which > is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus > problematic. It doesn't even have good retry logic, therefore Flink fails > really frequently (about every 10 mins as we observed) when Flink writes too > fast to Kinesis and receives RateLimitExceededException, > Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, > "*With the newer version of the KPL it uses the AWS C++ SDK which should > offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading > to, is released in May 2017 and should have the enhanced retry logic. > *KCL:* > Upgrade KCL from 1.6.2 to 1.8.1 > *AWS SDK* > from 1.10.71 to 1.11.171 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144796#comment-16144796 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 can anyone from data artisan take a look at this PR please? > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4547: [FLINK-7454][docs] update 'Monitoring Current Event Time'...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4547 @zentol does it look good to you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 can anyone from data artisan take a look at this PR please? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4522: [FLINK-7366][kinesis connector] Upgrade kinesis producer ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4522 can anyone from data artisan take a look at this PR please? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144771#comment-16144771 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on the issue: https://github.com/apache/flink/pull/4355 All comments addressed. Do you have time to merge it @wuchong @fhueske ? Thanks. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...
Github user kaibozhou commented on the issue: https://github.com/apache/flink/pull/4355 All comments addressed. Do you have time to merge it @wuchong @fhueske ? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135697894 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- Also the MapView should apply these modification. Very good suggesttion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144769#comment-16144769 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135697894 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- Also the MapView should apply these modification. Very good suggesttion. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144662#comment-16144662 ] Jark Wu commented on FLINK-7446: [~fhueske] yes, I created FLINK-7548 to discuss this. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7548) Support watermark generation for TableSource
Jark Wu created FLINK-7548: -- Summary: Support watermark generation for TableSource Key: FLINK-7548 URL: https://issues.apache.org/jira/browse/FLINK-7548 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Jark Wu As discussed in FLINK-7446, currently the TableSource only support to define rowtime field, but not support to extract watermarks from the rowtime field. We can provide a new interface called {{DefinedWatermark}}, which has two methods {{getRowtimeAttribute}} (can only be an existing field) and {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked deprecated. How to support periodic and punctuated watermarks and support some built-in strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala
[ https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144622#comment-16144622 ] ASF GitHub Bot commented on FLINK-7438: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4570 lgtm > Some classes are eclipsed by classes in package scala > - > > Key: FLINK-7438 > URL: https://issues.apache.org/jira/browse/FLINK-7438 > Project: Flink > Issue Type: Bug > Components: Build System, DataStream API >Reporter: Ted Yu >Priority: Minor > > Noticed the following during compilation: > {code} > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > object OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > [WARNING] ^ > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > class OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > {code} > We should avoid the warning e.r.t. OutputTag. > There may be other occurrences of similar warning. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4570: [FLINK-7438][DataStream API]Remove useless import, avoid ...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4570 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
[ https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7488: -- Description: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different heap sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110) {code} $HADOOP_CONF_DIR was not set prior to running the test. was: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different h
[jira] [Created] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable
Elias Levy created FLINK-7547: - Summary: o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable Key: FLINK-7547 URL: https://issues.apache.org/jira/browse/FLINK-7547 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.2 Reporter: Elias Levy Priority: Minor {{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared {{Serializable}}, whereas {{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is. This leads to the job not starting as the as async function can't be serialized during initialization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1610#comment-1610 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r13411 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- We can refactor the `ListView` constructors as follows: ``` class ListView[T] private[flink]( @transient private[flink] val elementTypeInfo: TypeInformation[T], private[flink] val list: util.List[T]) extends DataView { def this(elementTypeInfo: TypeInformation[T]) { this(elementTypeInfo, new util.ArrayList[T]()) } def this() = { this(null, new util.ArrayList[T]()) } ... } ``` and call the primary constructor in the `ListSerializer` with `null` for the type information. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1613#comment-1613 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135550350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- right now an empty `ArrayList` is always created when a `ListView` is instantiated. This is unnecessary overhead when the `ListView` is copied or deserialized using `ListViewSerializer` because the empty instance is immediately replaced. We should add an option to create a `ListView` without an `ArrayList` instance. This means we have to move the creation of the `ArrayList` out of the primary constructor. The same applies to the `MapView`. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1618#comment-1618 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135524948 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -42,6 +46,18 @@ class AggregationCodeGenerator( input: TypeInformation[_ <: Any]) extends CodeGenerator(config, nullableInput, input) { + // set of statements for cleanup dataview that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableCleanupStatements = mutable.LinkedHashSet[String]() + + /** +* @return code block of statements that need to be placed in the cleanup() method of --- End diff -- `RichFunction` does not have a `cleanup()` method. The `cleanup()` method is a method of `GeneratedAggregations`. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144439#comment-16144439 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135528903 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig.isDefined) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.get(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find DataView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) --- End diff -- move `serialize` method to this class and rename to `serializeStateDescriptor` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1614#comment-1614 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135648347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,92 @@ object UserDefinedFunctionUtils { // -- /** +* Remove StateView fields from accumulator type information. +* +* @param index index of aggregate function +* @param aggFun aggregate function +* @param accType accumulator type information, only support pojo type +* @param isStateBackedDataViews is data views use state backend +* @return mapping of accumulator type information and data view config which contains id, +* field name and state descriptor +*/ + def removeStateViewFieldsFromAccTypeInfo( +index: Int, +aggFun: AggregateFunction[_, _], +accType: TypeInformation[_], +isStateBackedDataViews: Boolean) + : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = { + +var hasDataView = false +val acc = aggFun.createAccumulator() +accType match { + case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 => +val arity = pojoType.getArity +val newPojoFields = new util.ArrayList[PojoField]() +val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]] +for (i <- 0 until arity) { + val pojoField = pojoType.getPojoFieldAt(i) + val field = pojoField.getField + val fieldName = field.getName + field.setAccessible(true) + + pojoField.getTypeInformation match { +case map: MapViewTypeInfo[Any, Any] => + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { +val keyTypeInfo = mapView.keyTypeInfo +val valueTypeInfo = mapView.valueTypeInfo +val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) { + hasDataView = true + new MapViewTypeInfo(keyTypeInfo, valueTypeInfo) +} else { + map +} + +var spec = MapViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + +accumulatorSpecs += spec +if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) +} + } + +case list: ListViewTypeInfo[Any] => + val listView = field.get(acc).asInstanceOf[ListView[_]] + if (listView != null) { +val elementTypeInfo = listView.elementTypeInfo +val newTypeInfo = if (elementTypeInfo != null) { + hasDataView = true + new ListViewTypeInfo(elementTypeInfo) +} else { + list +} + +var spec = ListViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + +accumulatorSpecs += spec +if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) +} + } + +case _ => newPojoFields.add(pojoField) + } +} +(new PojoTypeInfo(accType.getTypeClass, newPojoFields), Some(accumulatorSpecs)) + + case _ if !hasDataView => (accType, None) + case _ => throw new TableException("MapView and ListView only support in PoJo class") --- End diff -- This case will never be reached. `hasDataView` is only set to `true` in the `case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we come to this point. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1615#comment-1615 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135534660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { --- End diff -- Add parentheses to method. Only methods without side-effects should have no parentheses. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1617#comment-1617 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135541895 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the list, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +class ListViewSerializer[T](val listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = { +new ListView[T] + } + + override def copy(from: ListView[T]): ListView[T] = { +val listview = new ListView[T] +listview.list = from.list --- End diff -- We should create a copy of `from.list` using the `ListSerializer`. Otherwise we share the instance. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1619#comment-1619 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135539685 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -249,7 +258,8 @@ object AggregateUtil { outputArity, needRetract, needMerge = false, - needReset = true + needReset = true, --- End diff -- `needReset` can be `false`. `resetAccumulator()` is not called by the any of the window operators. Not sure why this was `true` before... > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1611#comment-1611 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135528733 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig.isDefined) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.get(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find DataView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) +val dataViewFieldTerm = createDataViewTerm(i, dataViewField.getName) +val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + """.stripMargin +reusableMemberStatements.add(field) + +// create DataViews +val descFieldTerm = s"${dataViewFieldTerm}_desc" +val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName +val descDeserializeCode = + s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} --- End diff -- implement deserialization directly in generated code. Moreover, we should use the user code classloader for the deserialization which is accessible via the `RuntimeContext`. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1612#comment-1612 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135561193 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -237,9 +248,13 @@ public void resetAccumulator(CountDistinctAccum acc) { //Overloaded retract method public void retract(CountDistinctAccum accumulator, long id) { try { - if (!accumulator.map.contains(String.valueOf(id))) { - accumulator.map.remove(String.valueOf(id)); - accumulator.count -= 1; + Integer cnt = accumulator.map.get(String.valueOf(id)); + if (cnt != null) { + cnt -= 1; + if (cnt <= 0) { + accumulator.map.remove(String.valueOf(id)); + accumulator.count -= 1; + } --- End diff -- We should update the count if it is > 0: ``` else { accumulator.map.put(String.valueOf(id), cnt) } ``` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1616#comment-1616 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135587110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1398,14 +1412,29 @@ object AggregateUtil { } } +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + // create accumulator type information for every aggregate function aggregates.zipWithIndex.foreach { case (agg, index) => - if (null == accTypes(index)) { + if (accTypes(index) != null) { +val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index, + agg, + accTypes(index), + isStateBackedDataViews) +if (specs.isDefined) { + accSpecs(index) = specs.get + accTypes(index) = accType +} else { + accSpecs(index) = Seq() + accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg) --- End diff -- No need to override `accTypes(index)` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135541895 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the list, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +class ListViewSerializer[T](val listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = { +new ListView[T] + } + + override def copy(from: ListView[T]): ListView[T] = { +val listview = new ListView[T] +listview.list = from.list --- End diff -- We should create a copy of `from.list` using the `ListSerializer`. Otherwise we share the instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135534660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { --- End diff -- Add parentheses to method. Only methods without side-effects should have no parentheses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135550350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- right now an empty `ArrayList` is always created when a `ListView` is instantiated. This is unnecessary overhead when the `ListView` is copied or deserialized using `ListViewSerializer` because the empty instance is immediately replaced. We should add an option to create a `ListView` without an `ArrayList` instance. This means we have to move the creation of the `ArrayList` out of the primary constructor. The same applies to the `MapView`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135528903 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig.isDefined) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.get(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find DataView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) --- End diff -- move `serialize` method to this class and rename to `serializeStateDescriptor` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r13411 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- We can refactor the `ListView` constructors as follows: ``` class ListView[T] private[flink]( @transient private[flink] val elementTypeInfo: TypeInformation[T], private[flink] val list: util.List[T]) extends DataView { def this(elementTypeInfo: TypeInformation[T]) { this(elementTypeInfo, new util.ArrayList[T]()) } def this() = { this(null, new util.ArrayList[T]()) } ... } ``` and call the primary constructor in the `ListSerializer` with `null` for the type information. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135539685 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -249,7 +258,8 @@ object AggregateUtil { outputArity, needRetract, needMerge = false, - needReset = true + needReset = true, --- End diff -- `needReset` can be `false`. `resetAccumulator()` is not called by the any of the window operators. Not sure why this was `true` before... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135528733 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig.isDefined) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.get(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find DataView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) +val dataViewFieldTerm = createDataViewTerm(i, dataViewField.getName) +val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + """.stripMargin +reusableMemberStatements.add(field) + +// create DataViews +val descFieldTerm = s"${dataViewFieldTerm}_desc" +val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName +val descDeserializeCode = + s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} --- End diff -- implement deserialization directly in generated code. Moreover, we should use the user code classloader for the deserialization which is accessible via the `RuntimeContext`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135648347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,92 @@ object UserDefinedFunctionUtils { // -- /** +* Remove StateView fields from accumulator type information. +* +* @param index index of aggregate function +* @param aggFun aggregate function +* @param accType accumulator type information, only support pojo type +* @param isStateBackedDataViews is data views use state backend +* @return mapping of accumulator type information and data view config which contains id, +* field name and state descriptor +*/ + def removeStateViewFieldsFromAccTypeInfo( +index: Int, +aggFun: AggregateFunction[_, _], +accType: TypeInformation[_], +isStateBackedDataViews: Boolean) + : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = { + +var hasDataView = false +val acc = aggFun.createAccumulator() +accType match { + case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 => +val arity = pojoType.getArity +val newPojoFields = new util.ArrayList[PojoField]() +val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]] +for (i <- 0 until arity) { + val pojoField = pojoType.getPojoFieldAt(i) + val field = pojoField.getField + val fieldName = field.getName + field.setAccessible(true) + + pojoField.getTypeInformation match { +case map: MapViewTypeInfo[Any, Any] => + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { +val keyTypeInfo = mapView.keyTypeInfo +val valueTypeInfo = mapView.valueTypeInfo +val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) { + hasDataView = true + new MapViewTypeInfo(keyTypeInfo, valueTypeInfo) +} else { + map +} + +var spec = MapViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + +accumulatorSpecs += spec +if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) +} + } + +case list: ListViewTypeInfo[Any] => + val listView = field.get(acc).asInstanceOf[ListView[_]] + if (listView != null) { +val elementTypeInfo = listView.elementTypeInfo +val newTypeInfo = if (elementTypeInfo != null) { + hasDataView = true + new ListViewTypeInfo(elementTypeInfo) +} else { + list +} + +var spec = ListViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + +accumulatorSpecs += spec +if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) +} + } + +case _ => newPojoFields.add(pojoField) + } +} +(new PojoTypeInfo(accType.getTypeClass, newPojoFields), Some(accumulatorSpecs)) + + case _ if !hasDataView => (accType, None) + case _ => throw new TableException("MapView and ListView only support in PoJo class") --- End diff -- This case will never be reached. `hasDataView` is only set to `true` in the `case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we come to this point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135587110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1398,14 +1412,29 @@ object AggregateUtil { } } +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + // create accumulator type information for every aggregate function aggregates.zipWithIndex.foreach { case (agg, index) => - if (null == accTypes(index)) { + if (accTypes(index) != null) { +val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index, + agg, + accTypes(index), + isStateBackedDataViews) +if (specs.isDefined) { + accSpecs(index) = specs.get + accTypes(index) = accType +} else { + accSpecs(index) = Seq() + accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg) --- End diff -- No need to override `accTypes(index)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135561193 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -237,9 +248,13 @@ public void resetAccumulator(CountDistinctAccum acc) { //Overloaded retract method public void retract(CountDistinctAccum accumulator, long id) { try { - if (!accumulator.map.contains(String.valueOf(id))) { - accumulator.map.remove(String.valueOf(id)); - accumulator.count -= 1; + Integer cnt = accumulator.map.get(String.valueOf(id)); + if (cnt != null) { + cnt -= 1; + if (cnt <= 0) { + accumulator.map.remove(String.valueOf(id)); + accumulator.count -= 1; + } --- End diff -- We should update the count if it is > 0: ``` else { accumulator.map.put(String.valueOf(id), cnt) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135524948 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -42,6 +46,18 @@ class AggregationCodeGenerator( input: TypeInformation[_ <: Any]) extends CodeGenerator(config, nullableInput, input) { + // set of statements for cleanup dataview that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableCleanupStatements = mutable.LinkedHashSet[String]() + + /** +* @return code block of statements that need to be placed in the cleanup() method of --- End diff -- `RichFunction` does not have a `cleanup()` method. The `cleanup()` method is a method of `GeneratedAggregations`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144282#comment-16144282 ] Shuyi Chen commented on FLINK-7491: --- Hi [~jark], thanks for the response. However, I am worried with Array as the runtime type, multiset specific operation will be slow, for example. MEMBER OF operator is O(1) for multiset data structure and O(n) for array. SUBMULTISET OF operator is O(m+n) for array, and O(m) for multiset if to test M < N. Also the actual type I am using is HashMultiset, which is backed by a java HashMap, which I think should perform reasonably well. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7546) Support SUBMULTISET_OF Operator for Multiset SQL type
Shuyi Chen created FLINK-7546: - Summary: Support SUBMULTISET_OF Operator for Multiset SQL type Key: FLINK-7546 URL: https://issues.apache.org/jira/browse/FLINK-7546 Project: Flink Issue Type: New Feature Reporter: Shuyi Chen Assignee: Shuyi Chen -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7545) Support MEMBER OF Operator for Multiset SQL type
Shuyi Chen created FLINK-7545: - Summary: Support MEMBER OF Operator for Multiset SQL type Key: FLINK-7545 URL: https://issues.apache.org/jira/browse/FLINK-7545 Project: Flink Issue Type: New Feature Reporter: Shuyi Chen Assignee: Shuyi Chen -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144102#comment-16144102 ] Xingcan Cui commented on FLINK-6233: Hi [~fhueske], the [document|https://goo.gl/VW5Gpd] has been roughly finished. I wish it could help. :) > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143938#comment-16143938 ] ASF GitHub Bot commented on FLINK-7452: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4612 CC @alpinegizmo > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4612: [FLINK-7452] [types] Add helper methods for all built-in ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4612 CC @alpinegizmo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4612 [FLINK-7452] [types] Add helper methods for all built-in Flink types ## What is the purpose of the change This PR provides an unified, detailed, and easy to use utility to work with Flink's built-in Java types. This class helps users to get an overview about built-in Flink types and their features/limitations (esp. regarding null support). ## Brief change log Improved `org.apache.flink.api.common.typeinfo.Types`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-7452 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4612.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4612 commit 2e027ed3c4671363f7023dd9faf7ba0d0c827312 Author: twalthr Date: 2017-08-28T12:13:07Z [FLINK-7452] [types] Add helper methods for all built-in Flink types to Types --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143937#comment-16143937 ] ASF GitHub Bot commented on FLINK-7452: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4612 [FLINK-7452] [types] Add helper methods for all built-in Flink types ## What is the purpose of the change This PR provides an unified, detailed, and easy to use utility to work with Flink's built-in Java types. This class helps users to get an overview about built-in Flink types and their features/limitations (esp. regarding null support). ## Brief change log Improved `org.apache.flink.api.common.typeinfo.Types`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-7452 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4612.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4612 commit 2e027ed3c4671363f7023dd9faf7ba0d0c827312 Author: twalthr Date: 2017-08-28T12:13:07Z [FLINK-7452] [types] Add helper methods for all built-in Flink types to Types > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7543) Simplify REST parameter access.
[ https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143915#comment-16143915 ] ASF GitHub Bot commented on FLINK-7543: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4611 Changes LGTM @zentol ! When Travis gives a green light, feel free to merge! > Simplify REST parameter access. > --- > > Key: FLINK-7543 > URL: https://issues.apache.org/jira/browse/FLINK-7543 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > Currently you have to do: > {{ > final ParameterTypes.JobIdPathParam jobId = > request.getPathParameter(ParameterTypes.JobIdPathParam.class); > JobID jobID = jobId.getValue(); > }} > This issue proposes to remove the second step and return directly the value, > while performing the necessary checks internally (different for query and > path parameters), without exposing it to the user. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4611: [FLINK-7543] [REST] Simplify handler access to path/query...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4611 Changes LGTM @zentol ! When Travis gives a green light, feel free to merge! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7310) always use HybridMemorySegment
[ https://issues.apache.org/jira/browse/FLINK-7310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143910#comment-16143910 ] ASF GitHub Bot commented on FLINK-7310: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4445 Thanks! I am currently trying to pinpoint what part of the code exactly suffers most from the regression. If that is for example specific to the microbenchmark, we can merge this without concern... > always use HybridMemorySegment > -- > > Key: FLINK-7310 > URL: https://issues.apache.org/jira/browse/FLINK-7310 > Project: Flink > Issue Type: Sub-task > Components: Core >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > For future changes to the network buffers (sending our own off-heap buffers > through to netty), we cannot use {{HeapMemorySegment}} anymore and need to > rely on {{HybridMemorySegment}} instead. > We should thus drop any code that loads the {{HeapMemorySegment}} (it is > still available if needed) in favour of the {{HybridMemorySegment}} which is > able to work on both heap and off-heap memory. > FYI: For the performance penalty of this change compared to using > {{HeapMemorySegment}} alone, see this interesting blob article (from 2015): > https://flink.apache.org/news/2015/09/16/off-heap-memory.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4445: [FLINK-7310][core] always use the HybridMemorySegment
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4445 Thanks! I am currently trying to pinpoint what part of the code exactly suffers most from the regression. If that is for example specific to the microbenchmark, we can merge this without concern... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7543) Simplify REST parameter access.
[ https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143904#comment-16143904 ] ASF GitHub Bot commented on FLINK-7543: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4611 [FLINK-7543] [REST] Simplify handler access to path/query parameters ## What is the purpose of the change This PR simplifies the access to path/query parameters by directly returning the value contained in the parameter instead of the parameter itself. ## Brief change log * make all path parameters mandatory * simplify access in `HandlerRequest` * simplify test code in `RestEndpointITCase` ## Verifying this change This change is already covered by existing tests, such as `RestEndpointITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 7544 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4611 commit dcce0b7631bf65ea66dbe0d64b368c7143815f9e Author: zentol Date: 2017-08-28T14:46:04Z [FLINK-7544] [REST] Make all path parameters mandatory commit 42486981c4310076a99a3cb6983131c2ae14725a Author: zentol Date: 2017-08-28T15:26:01Z [FLINK-7543] [REST] Simplify handler access to path/query parameters > Simplify REST parameter access. > --- > > Key: FLINK-7543 > URL: https://issues.apache.org/jira/browse/FLINK-7543 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > Currently you have to do: > {{ > final ParameterTypes.JobIdPathParam jobId = > request.getPathParameter(ParameterTypes.JobIdPathParam.class); > JobID jobID = jobId.getValue(); > }} > This issue proposes to remove the second step and return directly the value, > while performing the necessary checks internally (different for query and > path parameters), without exposing it to the user. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4611: [FLINK-7543] [REST] Simplify handler access to pat...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4611 [FLINK-7543] [REST] Simplify handler access to path/query parameters ## What is the purpose of the change This PR simplifies the access to path/query parameters by directly returning the value contained in the parameter instead of the parameter itself. ## Brief change log * make all path parameters mandatory * simplify access in `HandlerRequest` * simplify test code in `RestEndpointITCase` ## Verifying this change This change is already covered by existing tests, such as `RestEndpointITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 7544 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4611 commit dcce0b7631bf65ea66dbe0d64b368c7143815f9e Author: zentol Date: 2017-08-28T14:46:04Z [FLINK-7544] [REST] Make all path parameters mandatory commit 42486981c4310076a99a3cb6983131c2ae14725a Author: zentol Date: 2017-08-28T15:26:01Z [FLINK-7543] [REST] Simplify handler access to path/query parameters --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems
[ https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143881#comment-16143881 ] ASF GitHub Bot commented on FLINK-6306: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4607 We cannot restart Travis ourselves. Only the contributor can schedule another run by adding another commit (even an empty one). However, please don't do that for the sake of getting a picture-perfect build; we are aware of some unstable tests and account for that in the review. > Sink for eventually consistent file systems > --- > > Key: FLINK-6306 > URL: https://issues.apache.org/jira/browse/FLINK-6306 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Seth Wiesman >Assignee: Seth Wiesman > Attachments: eventually-consistent-sink > > > Currently Flink provides the BucketingSink as an exactly once method for > writing out to a file system. It provides these guarantees by moving files > through several stages and deleting or truncating files that get into a bad > state. While this is a powerful abstraction, it causes issues with eventually > consistent file systems such as Amazon's S3 where most operations (ie rename, > delete, truncate) are not guaranteed to become consistent within a reasonable > amount of time. Flink should provide a sink that provides exactly once writes > to a file system where only PUT operations are considered consistent. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4607 We cannot restart Travis ourselves. Only the contributor can schedule another run by adding another commit (even an empty one). However, please don't do that for the sake of getting a picture-perfect build; we are aware of some unstable tests and account for that in the review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
issues@flink.apache.org
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143844#comment-16143844 ] Jacob Park commented on FLINK-7465: --- [~fhueske] [~sunjincheng121] Thanks for the context. :) If HyperLogLogs are out, then how about Cuckoo Filters? They are similar to Bloom Filters, but they are designed differently as inspired by cuckoo hashing, supports deletion, and takes approximately the same space. https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf See https://bdupras.github.io/filter-tutorial/ for an interactive summary. You can also estimate a count with Cuckoo Filters unlike standard Bloom Filters. {noformat} ...for reasonably large sized sets, for the same false positive rate as a corresponding Bloom filter, cuckoo filters use less space than Bloom filters, are faster on lookups (but slower on insertions/to construct), and amazingly also allow deletions of keys (which Bloom filters cannot do). -Michael Mitzenmacher (2014) {noformat} > Add build-in BloomFilterCount on TableAPI&SQL > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7544) Make all PathParameters mandatory
Chesnay Schepler created FLINK-7544: --- Summary: Make all PathParameters mandatory Key: FLINK-7544 URL: https://issues.apache.org/jira/browse/FLINK-7544 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.4.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.4.0 In the current REST architecture all path parameters are mandatory, so we should mark them as such in {{MessagePathParameter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7543) Simplify REST parameter access.
[ https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-7543: -- Description: Currently you have to do: {{ final ParameterTypes.JobIdPathParam jobId = request.getPathParameter(ParameterTypes.JobIdPathParam.class); JobID jobID = jobId.getValue(); }} This issue proposes to remove the second step and return directly the value, while performing the necessary checks internally (different for query and path parameters), without exposing it to the user. was: Currently you have to do: { final ParameterTypes.JobIdPathParam jobId = request.getPathParameter(ParameterTypes.JobIdPathParam.class); JobID jobID = jobId.getValue(); } This issue proposes to remove the second step and return directly the value, while performing the necessary checks internally (different for query and path parameters), without exposing it to the user. > Simplify REST parameter access. > --- > > Key: FLINK-7543 > URL: https://issues.apache.org/jira/browse/FLINK-7543 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > Currently you have to do: > {{ > final ParameterTypes.JobIdPathParam jobId = > request.getPathParameter(ParameterTypes.JobIdPathParam.class); > JobID jobID = jobId.getValue(); > }} > This issue proposes to remove the second step and return directly the value, > while performing the necessary checks internally (different for query and > path parameters), without exposing it to the user. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7543) Simplify REST parameter access.
[ https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-7543: -- Description: Currently you have to do: { final ParameterTypes.JobIdPathParam jobId = request.getPathParameter(ParameterTypes.JobIdPathParam.class); JobID jobID = jobId.getValue(); } This issue proposes to remove the second step and return directly the value, while performing the necessary checks internally (different for query and path parameters), without exposing it to the user. was: Currently you have to do: {{ final ParameterTypes.JobIdPathParam jobId = request.getPathParameter(ParameterTypes.JobIdPathParam.class); JobID jobID = jobId.getValue(); }} This issue proposes to remove the second step and return directly the value, while performing the necessary checks internally (different for query and path parameters), without exposing it to the user. > Simplify REST parameter access. > --- > > Key: FLINK-7543 > URL: https://issues.apache.org/jira/browse/FLINK-7543 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > Currently you have to do: > { > final ParameterTypes.JobIdPathParam jobId = > request.getPathParameter(ParameterTypes.JobIdPathParam.class); > JobID jobID = jobId.getValue(); > } > This issue proposes to remove the second step and return directly the value, > while performing the necessary checks internally (different for query and > path parameters), without exposing it to the user. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7543) Simplify REST parameter access.
Kostas Kloudas created FLINK-7543: - Summary: Simplify REST parameter access. Key: FLINK-7543 URL: https://issues.apache.org/jira/browse/FLINK-7543 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.4.0 Reporter: Kostas Kloudas Assignee: Chesnay Schepler Fix For: 1.4.0 Currently you have to do: {{{ final ParameterTypes.JobIdPathParam jobId = request.getPathParameter(ParameterTypes.JobIdPathParam.class); JobID jobID = jobId.getValue(); }}} This issue proposes to remove the second step and return directly the value, while performing the necessary checks internally (different for query and path parameters), without exposing it to the user. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7543) Simplify REST parameter access.
[ https://issues.apache.org/jira/browse/FLINK-7543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-7543: -- Description: Currently you have to do: {{ final ParameterTypes.JobIdPathParam jobId = request.getPathParameter(ParameterTypes.JobIdPathParam.class); JobID jobID = jobId.getValue(); }} This issue proposes to remove the second step and return directly the value, while performing the necessary checks internally (different for query and path parameters), without exposing it to the user. was: Currently you have to do: {{{ final ParameterTypes.JobIdPathParam jobId = request.getPathParameter(ParameterTypes.JobIdPathParam.class); JobID jobID = jobId.getValue(); }}} This issue proposes to remove the second step and return directly the value, while performing the necessary checks internally (different for query and path parameters), without exposing it to the user. > Simplify REST parameter access. > --- > > Key: FLINK-7543 > URL: https://issues.apache.org/jira/browse/FLINK-7543 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > Currently you have to do: > {{ > final ParameterTypes.JobIdPathParam jobId = > request.getPathParameter(ParameterTypes.JobIdPathParam.class); > JobID jobID = jobId.getValue(); > }} > This issue proposes to remove the second step and return directly the value, > while performing the necessary checks internally (different for query and > path parameters), without exposing it to the user. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4609: Assigner
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4609 @StefanRRichter please take a look --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143814#comment-16143814 ] ASF GitHub Bot commented on FLINK-7378: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 Yes, this way also has some advantages, and recycling these exclusive buffers would be covered in next PR with some additional tests. I will consider your suggestions to supplement some tests in this PR and submit the modifications based on all the above comments. > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 Yes, this way also has some advantages, and recycling these exclusive buffers would be covered in next PR with some additional tests. I will consider your suggestions to supplement some tests in this PR and submit the modifications based on all the above comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones
[ https://issues.apache.org/jira/browse/FLINK-7542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Usman Younas updated FLINK-7542: Affects Version/s: 1.3.2 > Some tests in AggregateITCase fail for some Time Zones > -- > > Key: FLINK-7542 > URL: https://issues.apache.org/jira/browse/FLINK-7542 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Usman Younas > > In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests > 1. testTumbleWindowAggregate and > 2. testHopWindowAggregate > are failing for some time zones. > Bug can be produced by changing the time zone of machine to > Time Zone: Central Daylight Time > Closest City: Houston-United States > I think, problem is with Timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6860) update Apache Beam in page Ecosystem
[ https://issues.apache.org/jira/browse/FLINK-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou reassigned FLINK-6860: --- Assignee: Hai Zhou > update Apache Beam in page Ecosystem > > > Key: FLINK-6860 > URL: https://issues.apache.org/jira/browse/FLINK-6860 > Project: Flink > Issue Type: Task > Components: Documentation >Reporter: Xu Mingmin >Assignee: Hai Zhou >Priority: Minor > > To remove the word {{incubating}} and update the link, --Apache Beam has > graduated as a top-level project. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6306) Sink for eventually consistent file systems
[ https://issues.apache.org/jira/browse/FLINK-6306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143792#comment-16143792 ] ASF GitHub Bot commented on FLINK-6306: --- Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/4607 Would you be able to rerun travis, the test failed on a single configuration during the Kafka09ITTest due to a task manager failure. I do not believe any of my code changes touched any of the code paths in that test. > Sink for eventually consistent file systems > --- > > Key: FLINK-6306 > URL: https://issues.apache.org/jira/browse/FLINK-6306 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Seth Wiesman >Assignee: Seth Wiesman > Attachments: eventually-consistent-sink > > > Currently Flink provides the BucketingSink as an exactly once method for > writing out to a file system. It provides these guarantees by moving files > through several stages and deleting or truncating files that get into a bad > state. While this is a powerful abstraction, it causes issues with eventually > consistent file systems such as Amazon's S3 where most operations (ie rename, > delete, truncate) are not guaranteed to become consistent within a reasonable > amount of time. Flink should provide a sink that provides exactly once writes > to a file system where only PUT operations are considered consistent. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones
Usman Younas created FLINK-7542: --- Summary: Some tests in AggregateITCase fail for some Time Zones Key: FLINK-7542 URL: https://issues.apache.org/jira/browse/FLINK-7542 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Usman Younas In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests 1. testTumbleWindowAggregate and 2. testHopWindowAggregate are failing for some time zones. Bug can be produced by changing the time zone of machine to Time Zone: Central Daylight Time Closest City: Houston-United States I think, problem is with Timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4607: [FLINK-6306][connectors] Sink for eventually consistent f...
Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/4607 Would you be able to rerun travis, the test failed on a single configuration during the Kafka09ITTest due to a task manager failure. I do not believe any of my code changes touched any of the code paths in that test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7130) Remove eventSerializer from NFA
[ https://issues.apache.org/jira/browse/FLINK-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143787#comment-16143787 ] ASF GitHub Bot commented on FLINK-7130: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/4610 [FLINK-7130] Removed event serializer from NFA and SharedBuffer ## What is the purpose of the change * The purpose is to remove usage of event serializer from `NFA` and `SharedBuffer` classes, as it should not be used in the logic part. ## Brief change log *(for example:)* - Removed NonDuplicatingTypeSerializer - Removed eventSerializer from `NFA` - Removed eventSerializer from `SharedBuffer` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no ) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink cep-remove-event-serializer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4610.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4610 commit 1f61b2f0858506a9ff7bde47a33c8ab2b6ddbb50 Author: mtunique Date: 2017-04-13T12:22:41Z [FLINK-7130] Removed event serializer from NFA and SharedBuffer > Remove eventSerializer from NFA > --- > > Key: FLINK-7130 > URL: https://issues.apache.org/jira/browse/FLINK-7130 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > > Right now eventSerializer is serialized within NFA. It should be present only > in NFASerializer. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4610: [FLINK-7130] Removed event serializer from NFA and...
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/4610 [FLINK-7130] Removed event serializer from NFA and SharedBuffer ## What is the purpose of the change * The purpose is to remove usage of event serializer from `NFA` and `SharedBuffer` classes, as it should not be used in the logic part. ## Brief change log *(for example:)* - Removed NonDuplicatingTypeSerializer - Removed eventSerializer from `NFA` - Removed eventSerializer from `SharedBuffer` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no ) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink cep-remove-event-serializer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4610.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4610 commit 1f61b2f0858506a9ff7bde47a33c8ab2b6ddbb50 Author: mtunique Date: 2017-04-13T12:22:41Z [FLINK-7130] Removed event serializer from NFA and SharedBuffer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4609: Assigner
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4609 Assigner This PR is a pure refactor and shouldn't change any functionality. It should be covered by existing tests like `CheckpointCoordinatorTest` You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink assigner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4609.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4609 commit 06c1367c0a9a05c13bc5abf4bae96241350a276e Author: Piotr Nowojski Date: 2017-08-25T11:59:36Z [hotfix][runtime] Checkstyle changes in TaskStateSnapshot commit efc46f7cc1508499e421946767b326ba91118c38 Author: Piotr Nowojski Date: 2017-08-25T13:23:15Z [FLINK-7541][runtime] Refactor StateAssignmentOperation and use OperatorID This is not complete refactor, some methods still relay on the order of the new and old operators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7541) Redistribute operator state using OperatorID
Piotr Nowojski created FLINK-7541: - Summary: Redistribute operator state using OperatorID Key: FLINK-7541 URL: https://issues.apache.org/jira/browse/FLINK-7541 Project: Flink Issue Type: Improvement Reporter: Piotr Nowojski Assignee: Piotr Nowojski Currently StateAssignmentOperation relays heavily on the order of new and old operators in the task. It should be changed and it should relay more on OperatorID. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143722#comment-16143722 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4355 Hi @wuchong, I think we don't need to call `open()` and `close()` in `AggregateAggFunction`. `GeneratedAggregations` is an internal class which is not exposed to users. It would be a bug in the translation logic if a `GeneratedAggregations` which requires `open()` or `close()` would be passed to a `AggregateAggFunction`. A user couldn't do anything to prevent this. +1 for refactoring `AggregateCodeGenerator`. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4355 Hi @wuchong, I think we don't need to call `open()` and `close()` in `AggregateAggFunction`. `GeneratedAggregations` is an internal class which is not exposed to users. It would be a bug in the translation logic if a `GeneratedAggregations` which requires `open()` or `close()` would be passed to a `AggregateAggFunction`. A user couldn't do anything to prevent this. +1 for refactoring `AggregateCodeGenerator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7227) OR expression with more than 2 predicates is not pushed into a TableSource
[ https://issues.apache.org/jira/browse/FLINK-7227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143686#comment-16143686 ] ASF GitHub Bot commented on FLINK-7227: --- GitHub user uybhatti opened a pull request: https://github.com/apache/flink/pull/4608 [FLINK-7227][Table API & SQL]Fix the the TableSource predicate push… … down issue for OR and AND expression with more than 2 predicates *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature
[GitHub] flink pull request #4608: [FLINK-7227][Table API & SQL]Fix the the TableSour...
GitHub user uybhatti opened a pull request: https://github.com/apache/flink/pull/4608 [FLINK-7227][Table API & SQL]Fix the the TableSource predicate push⦠⦠down issue for OR and AND expression with more than 2 predicates *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/uybhatti/flink FLINK-7227 Alternatively you can review and apply these changes
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135504282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.reflect.Field + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo} + +/** + * Data view specification. + * + * @tparam ACC type extends [[DataView]] + */ +trait DataViewSpec[ACC <: DataView] { + def id: String + def field: Field + def toStateDescriptor: StateDescriptor[_, _] --- End diff -- Very good point, you are right! We need to generate the state descriptors here, serialize them and ship them. Thanks for the clarification. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143682#comment-16143682 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135504282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.reflect.Field + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo} + +/** + * Data view specification. + * + * @tparam ACC type extends [[DataView]] + */ +trait DataViewSpec[ACC <: DataView] { + def id: String + def field: Field + def toStateDescriptor: StateDescriptor[_, _] --- End diff -- Very good point, you are right! We need to generate the state descriptors here, serialize them and ship them. Thanks for the clarification. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135503957 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function { * aggregated results */ def resetAccumulator(accumulators: Row) + + /** +* Cleanup for the accumulators. +*/ + def cleanup() + + /** +* Tear-down method for [[org.apache.flink.table.functions.AggregateFunction]]. +* It can be used for clean up work. By default, this method does nothing. +*/ + def close() --- End diff -- Sorry, I overlooked the `close()` calls. If the method is used, we should keep it of course. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143681#comment-16143681 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135503957 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function { * aggregated results */ def resetAccumulator(accumulators: Row) + + /** +* Cleanup for the accumulators. +*/ + def cleanup() + + /** +* Tear-down method for [[org.apache.flink.table.functions.AggregateFunction]]. +* It can be used for clean up work. By default, this method does nothing. +*/ + def close() --- End diff -- Sorry, I overlooked the `close()` calls. If the method is used, we should keep it of course. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7525) Add config option to disable Cancel functionality on UI
[ https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7525: Component/s: Webfrontend Web Client > Add config option to disable Cancel functionality on UI > --- > > Key: FLINK-7525 > URL: https://issues.apache.org/jira/browse/FLINK-7525 > Project: Flink > Issue Type: Improvement > Components: Web Client, Webfrontend >Reporter: Ted Yu > > In this email thread > http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI > , Raja was asking for a way to control how users cancel Job(s). > Robert proposed adding a config option which disables the Cancel > functionality. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
issues@flink.apache.org
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143673#comment-16143673 ] Fabian Hueske commented on FLINK-7465: -- HyperLogLog does not support retraction, i.e., removal for formerly added values. Bloom filters could be modified to support retraction by using int or long arrays instead of bit arrays. However, this would increase the space requirements by 32x or 64x. I think it's fine to use HyperLogLog and ignore the retraction case. At the moment, retraction is only mandatory for OVER windows and FLINK-7471 proposes to support non-retractable aggregation functions in OVER windows. So, this limitation might be resolved in the future. > Add build-in BloomFilterCount on TableAPI&SQL > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
issues@flink.apache.org
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137425#comment-16137425 ] Fabian Hueske edited comment on FLINK-7465 at 8/28/17 10:20 AM: I'm sorry, I confused count-min sketches (for approximate group counts) and HyperLogLog (for approximate distinct counts). I assume the goal of the BloomFilterCount function is to (approximately) count the number of distinct values. In contrast to HyperLogLog, Bloom filters are not specifically designed for approximate distinct counting but for approximate membership testing. AFAIK, bloom filters should be more precise for log distinct cardinalities but HyperLogLog should provide much better results for larger cardinalities. IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty nice. OTOH, multiple RocksDB point lookups might also be more expensive than a single lookup with larger serialization payload (the deserialization logic for byte arrays shouldn't be very costy). was (Author: fhueske): I'm sorry, I confused count-min sketches (for approximate group counts) and HyperLogLog (for approximate distinct counts). I assume the goal of the BloomFilterCount function is to (approximately) count the number of distinct values. In contrast to HyperLogLog, Bloom filters are not specifically designed for approximate distinct counting but for approximate membership testing. AFAIK, bloom filters should be more precise for log distinct cardinalities but HyperLogLog should provide much better results for larger cardinalities. IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty nice. OTOH, point multiple RocksDB lookups might also be more expensive than a single lookup with larger serialization payload (the deserialization logic for byte arrays shouldn't be very costy). > Add build-in BloomFilterCount on TableAPI&SQL > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
issues@flink.apache.org
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137425#comment-16137425 ] Fabian Hueske edited comment on FLINK-7465 at 8/28/17 10:20 AM: I'm sorry, I confused count-min sketches (for approximate group counts) and HyperLogLog (for approximate distinct counts). I assume the goal of the BloomFilterCount function is to (approximately) count the number of distinct values. In contrast to HyperLogLog, Bloom filters are not specifically designed for approximate distinct counting but for approximate membership testing. AFAIK, bloom filters should be more precise for log distinct cardinalities but HyperLogLog should provide much better results for larger cardinalities. IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty nice. OTOH, multiple RocksDB point lookups might also be more expensive than a single lookup with larger serialization payload (the deserialization logic for byte arrays shouldn't be very costly). was (Author: fhueske): I'm sorry, I confused count-min sketches (for approximate group counts) and HyperLogLog (for approximate distinct counts). I assume the goal of the BloomFilterCount function is to (approximately) count the number of distinct values. In contrast to HyperLogLog, Bloom filters are not specifically designed for approximate distinct counting but for approximate membership testing. AFAIK, bloom filters should be more precise for log distinct cardinalities but HyperLogLog should provide much better results for larger cardinalities. IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty nice. OTOH, multiple RocksDB point lookups might also be more expensive than a single lookup with larger serialization payload (the deserialization logic for byte arrays shouldn't be very costy). > Add build-in BloomFilterCount on TableAPI&SQL > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7540) submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop cluster with capitalized hostname
Tong Yan Ou created FLINK-7540: -- Summary: submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop cluster with capitalized hostname Key: FLINK-7540 URL: https://issues.apache.org/jira/browse/FLINK-7540 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.3.2, 1.3.1, 1.4.0 Reporter: Tong Yan Ou Fix For: 1.3.3 Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” DSJ-signal-900G-71” When using the following command: ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 ~/flink-1.3.1/examples/batch/WordCount.jar --input /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result Or ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" There will be some exceptions at Command line interface: java.lang.RuntimeException: Unable to get ClusterClient status from Application Client at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) … Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running. h4. Then the job fails , starting the yarn-session is the same. The exceptions of the application log: 2017-08-10 17:36:10,334 WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), Path(/user/jobmanager)] … 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), Path(/user/jobmanager)]] after [1 ms] And I found some differences in actor System: 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager - Starting JobManager at akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager - JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted leadership with leader session ID Some(----). 2017-08-10 17:36:00,312 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend listening at 0:0:0:0:0:0:0:0:54921 2017-08-10 17:36:00,312 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 54921 2017-08-10 17:36:00,313 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader reachable under akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” The hostname of JobManagerRetriever’s actor is lowercase. And I read source code, Class NetUtils the unresolvedHostToNormalizedString(String host) method of line 127: public static String unresolvedHostToNormalizedString(String host) { // Return loopback interface address if host is null // This represents the behavior of {@code InetAddress.getByName } and RFC 3330 if (host == null) { host = InetAddress.getLoopbackAddress().getHostAddress(); } else {host = host.trim().toLowerCase(); } ... } It turns the host name into lowercase. Therefore, JobManagerRetriever certainly can not find Jobmanager's actorSYstem. Then I removed the call to the toLowerCase() method in the source code. Finally ,I can submit a job in yarn-cluster mode and start a yarn-session. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143560#comment-16143560 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4485 Hi @zhijiangW, regarding the buffer pool implementation, I was just curious about why it was done that way. But it is fine to keep the logic in `RemoteInputChannel` if you make sure, that a recycler puts these buffers right (back) into the buffer queue (I guess, that's in one of the follow-up PRs). This way, we avoid an additional intermediate component (and the need to interact with it). To conclude, on a second thought, it is fine as it is. The thing with `ResultPartitionType` is that without an (intermediate) way to set `isCreditBased` to `true`, we are not really able to test this code path on higher levels such as the `NetworkEnvironment` (or maybe I'll see that in the follow-up PRs as well). Speaking of tests...I understand that with the switch to credit-based flow control, some parts will be covered by existing tests, but we also change the behaviour at some points and the current tests are already a bit sparse. Can you also add tests for - the `NetworkEnvironment` changes (into `NetworkEnvironmentTest`), - `NetworkBufferPool#requestMemorySegments`, `NetworkBufferPool#recycleMemorySegments` (into `NetworkBufferPoolTest` which currently is a bit sparse though) - the changes in `SingleInputGate` (into `SingleInputGateTest`) > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4485 Hi @zhijiangW, regarding the buffer pool implementation, I was just curious about why it was done that way. But it is fine to keep the logic in `RemoteInputChannel` if you make sure, that a recycler puts these buffers right (back) into the buffer queue (I guess, that's in one of the follow-up PRs). This way, we avoid an additional intermediate component (and the need to interact with it). To conclude, on a second thought, it is fine as it is. The thing with `ResultPartitionType` is that without an (intermediate) way to set `isCreditBased` to `true`, we are not really able to test this code path on higher levels such as the `NetworkEnvironment` (or maybe I'll see that in the follow-up PRs as well). Speaking of tests...I understand that with the switch to credit-based flow control, some parts will be covered by existing tests, but we also change the behaviour at some points and the current tests are already a bit sparse. Can you also add tests for - the `NetworkEnvironment` changes (into `NetworkEnvironmentTest`), - `NetworkBufferPool#requestMemorySegments`, `NetworkBufferPool#recycleMemorySegments` (into `NetworkBufferPoolTest` which currently is a bit sparse though) - the changes in `SingleInputGate` (into `SingleInputGateTest`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143556#comment-16143556 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r135481583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() { public void setBufferPool(BufferPool bufferPool) { // Sanity checks - checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), + if (!getConsumedPartitionType().isCreditBased()) { + checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), "Bug in input gate setup logic: buffer pool has not enough guaranteed buffers " + - "for this input gate. Input gates require at least as many buffers as " + + "for this input gate. Input gates require at least as many buffers as " + "there are input channels."); + } checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool has" + - "already been set for this input gate."); + "already been set for this input gate."); this.bufferPool = checkNotNull(bufferPool); } + /** +* Assign the exclusive buffers to all remote input channels directly for credit-based mode. +* +* @param networkBufferPool The global pool to request and recycle exclusive buffers +* @param networkBuffersPerChannel The number of exclusive buffers for each channel +*/ + public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException { + this.networkBufferPool = checkNotNull(networkBufferPool); --- End diff -- please guard against using this method multiple times (like in `setBufferPool`) as a sanity check > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7129) Dynamically changing patterns
[ https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143558#comment-16143558 ] Aljoscha Krettek commented on FLINK-7129: - I think it should work. > Dynamically changing patterns > - > > Key: FLINK-7129 > URL: https://issues.apache.org/jira/browse/FLINK-7129 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > > An umbrella task for introducing mechanism for injecting patterns through > coStream -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r135481583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() { public void setBufferPool(BufferPool bufferPool) { // Sanity checks - checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), + if (!getConsumedPartitionType().isCreditBased()) { + checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), "Bug in input gate setup logic: buffer pool has not enough guaranteed buffers " + - "for this input gate. Input gates require at least as many buffers as " + + "for this input gate. Input gates require at least as many buffers as " + "there are input channels."); + } checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool has" + - "already been set for this input gate."); + "already been set for this input gate."); this.bufferPool = checkNotNull(bufferPool); } + /** +* Assign the exclusive buffers to all remote input channels directly for credit-based mode. +* +* @param networkBufferPool The global pool to request and recycle exclusive buffers +* @param networkBuffersPerChannel The number of exclusive buffers for each channel +*/ + public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException { + this.networkBufferPool = checkNotNull(networkBufferPool); --- End diff -- please guard against using this method multiple times (like in `setBufferPool`) as a sanity check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143553#comment-16143553 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r135480975 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List requestMemorySegments(int numRequiredBuffers) throws IOException { + synchronized (factoryLock) { --- End diff -- should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`? > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r135480975 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) { availableMemorySegments.add(segment); } + public List requestMemorySegments(int numRequiredBuffers) throws IOException { + synchronized (factoryLock) { --- End diff -- should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7509) Refactorings to AggregateCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143526#comment-16143526 ] Fabian Hueske commented on FLINK-7509: -- Yes, with more special cases being added it might make sense to split the code for the different aggregation types: - batch group aggregation - stream non-windowed group aggregation - stream group windowed aggregation - stream over windowed aggregation But maybe another separation is more meaningful. Do you have concrete plans for the split [~jark]? > Refactorings to AggregateCodeGenerator > -- > > Key: FLINK-7509 > URL: https://issues.apache.org/jira/browse/FLINK-7509 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu > > I think the `AggregateCodeGenerator#generateAggregations` is too long with > 500+ LOC currently and hard to extend. I would like to refactor it if you > have no objection. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143522#comment-16143522 ] Fabian Hueske commented on FLINK-6233: -- Thanks [~xccui], that would be great! :-) > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7512) avoid unnecessary buffer copies during network serialization
[ https://issues.apache.org/jira/browse/FLINK-7512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143520#comment-16143520 ] Nico Kruber commented on FLINK-7512: Hi [~phoenixjiangnan], no, the error you are seeing is not related to the network code but rather to timers and state. > avoid unnecessary buffer copies during network serialization > > > Key: FLINK-7512 > URL: https://issues.apache.org/jira/browse/FLINK-7512 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Currently, we have our own {{Buffer}} class backed by a {{MemorySegment}} and > whenever we try to write to or read from Netty, we need to copy to / from > Netty's {{ByteBuf}} instances. > This is am umbrella task for avoiding these buffer copies and some related > code changes -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143517#comment-16143517 ] Fabian Hueske commented on FLINK-7446: -- I think that's a good plan [~jark]. However, we should design it in a way that we can support periodic and punctuated watermarks. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function
[ https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143500#comment-16143500 ] ASF GitHub Bot commented on FLINK-7358: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4534 The text above `"What is the purpose of the change"` should be removed such that only the relevant info is in the PR description. > Add implicitly converts support for User-defined function > -- > > Key: FLINK-7358 > URL: https://issues.apache.org/jira/browse/FLINK-7358 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently if user defined a UDF as follows: > {code} > object Func extends ScalarFunction { > def eval(a: Int, b: Long): String = { > ... > } > } > {code} > And if the table schema is (a: Int, b: int, c: String), then we can not call > the UDF `Func('a, 'b)`. So > I want add implicitly converts when we call UDF. The implicitly convert rule > is: > BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> > FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO > *Note: > In this JIRA. only for TableAPI, And SQL will be fixed in > https://issues.apache.org/jira/browse/CALCITE-1908.* > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4534 The text above `"What is the purpose of the change"` should be removed such that only the relevant info is in the PR description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7410) Add getName method to UserDefinedFunction
[ https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143474#comment-16143474 ] Timo Walther commented on FLINK-7410: - No, the {{toString}} method is not used yet. We could use it for this purpose. > Add getName method to UserDefinedFunction > - > > Key: FLINK-7410 > URL: https://issues.apache.org/jira/browse/FLINK-7410 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > *Motivation* > Operator names setted in table-api are used by visualization and logging, it > is import to make these names simple and readable. Currently, > UserDefinedFunction’s name contains class CanonicalName and md5 value making > the name too long and unfriendly to users. > As shown in the following example, > {quote} > select: (a, b, c, > org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a) > AS _c3, > org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c) > AS _c4) > {quote} > *Changes:* > > Provide getName method for UserDefinedFunction. The method will return class > name by default. Users can also override the method to return whatever he > wants. > What do you think [~fhueske] ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)