[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974124#comment-15974124 ] mingleizhang commented on FLINK-6311: - [~tzulitai] Thanks for telling me so useful information. I am very appreciate it. Yep, I would like to work on this and been working on it soon enough. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface
[ https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974103#comment-15974103 ] Bowen Li commented on FLINK-5095: - I'll spend time in the following few days to see how other projects (hadoop, kafka, etc) handles this situation, and kick off a discussion > Add explicit notifyOfAddedX methods to MetricReporter interface > --- > > Key: FLINK-5095 > URL: https://issues.apache.org/jira/browse/FLINK-5095 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.3 >Reporter: Chesnay Schepler >Priority: Minor > > I would like to start a discussion on the MetricReporter interface, > specifically the methods that notify a reporter of added or removed metrics. > Currently, the methods are defined as follows: > {code} > void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); > void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup > group); > {code} > All metrics, regardless of their actual type, are passed to the reporter with > these methods. > Since the different metric types have to be handled differently we thus force > every reporter to do something like this: > {code} > if (metric instanceof Counter) { > Counter c = (Counter) metric; > // deal with counter > } else if (metric instanceof Gauge) { > // deal with gauge > } else if (metric instanceof Histogram) { > // deal with histogram > } else if (metric instanceof Meter) { > // deal with meter > } else { > // log something or throw an exception > } > {code} > This has a few issues > * the instanceof checks and castings are unnecessary overhead > * it requires the implementer to be aware of every metric type > * it encourages throwing an exception in the final else block > We could remedy all of these by reworking the interface to contain explicit > add/remove methods for every metric type. This would however be a breaking > change and blow up the interface to 12 methods from the current 4. We could > also add a RichMetricReporter interface with these methods, which would > require relatively little changes but add additional complexity. > I was wondering what other people think about this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 Addressed @zentol 's comments. Ready for another round. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112115930 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,85 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + +org.apache.flink +flink-metrics-datadog +1.3-SNAPSHOT + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.google.guava --- End diff -- shaded --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112115925 --- Diff: docs/monitoring/metrics.md --- @@ -436,6 +436,38 @@ metrics.reporter.stsd.port: 8125 {% endhighlight %} +### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Parameters: + +- `apikey` - the Datadog API key +- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only + +Example configuration: + +{% highlight yaml %} + +metrics.reporters: dghttp +metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter +metrics.reporter.dghttp.apikey: xxx +metrics.reporter.dghttp.tags: myflinkapp,prod + +// , , , , , will be sent to Datadog as tags +metrics.scope.jm: .jobmanager +metrics.scope.jm.job: ..jobmanager.job +metrics.scope.tm: ..taskmanager +metrics.scope.tm.job: ...taskmanager.job +metrics.scope.task: .task +metrics.scope.operator: .operator + +{% endhighlight %} + +Such metric reporting implementation is a best practice based on our experience working with Datadog. It helps --- End diff -- removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974069#comment-15974069 ] Tzu-Li (Gordon) Tai commented on FLINK-6311: the {{flink-connector-kinesis}} module is only included as a separate build profile ("include-kinesis"). We did this because of AWS License issues, and do not build the kinesis connector along with the other code. But this shouldn't be relevant for this JIRA, you can still proceed to fix it if you want to :) > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6311: --- Assignee: mingleizhang > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012 ] mingleizhang commented on FLINK-6311: - [~tzulitai] I just watch FlinkKinesisConsumer which under the package of {code}org.apache.flink.streaming.connectors.kinesis{code} in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml dont contains the {code}flink-connector-kinesis{code}. I would think we should add the module {code} flink-connector-kinesis {code} in flink-connectors pom.xml and then return to this issue. How do you think of this ? > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6149) add additional flink logical relation nodes
[ https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-6149. - Resolution: Implemented Fix Version/s: 1.3.0 > add additional flink logical relation nodes > --- > > Key: FLINK-6149 > URL: https://issues.apache.org/jira/browse/FLINK-6149 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3594 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes
[ https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974000#comment-15974000 ] ASF GitHub Bot commented on FLINK-6149: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3594 > add additional flink logical relation nodes > --- > > Key: FLINK-6149 > URL: https://issues.apache.org/jira/browse/FLINK-6149 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4821) Implement rescalable non-partitioned state for Kinesis Connector
[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973988#comment-15973988 ] ASF GitHub Bot commented on FLINK-4821: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tzulitai , I have rebased it and updated to the public API. > Implement rescalable non-partitioned state for Kinesis Connector > > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3001: [FLINK-4821] [kinesis] Implement rescalable non-partition...
Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tzulitai , I have rebased it and updated to the public API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973982#comment-15973982 ] hongyuhong commented on FLINK-6232: --- Hi [~fhueske] i have implement it use normal timestamp attribute but not like a.proctime/a.rowtime, cause it's not supported, I would appreciate if you can give some advice. Thanks very much. > Support proctime inner equi-join between two streams in the SQL API > --- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + > INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the > time condition only support bounded time range like {{o.proctime BETWEEN > s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not > support unbounded like {{o.proctime > s.protime}}, and should include both > two stream's proctime attribute, {{o.proctime between proctime() and > proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3001: [FLINK-4821] [kinesis] Implement rescalable non-partition...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 Hi @tony810430, The exposure for union list state was just merged to master. Could you rebase this? Once rebased I'll do a full review so that we can finally work towards merging this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973954#comment-15973954 ] ASF GitHub Bot commented on FLINK-6250: --- Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112105763 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -1140,6 +1140,202 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testNonPartitionedProcTimeOverDistinctWindow(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DIST(e)) OVER (" + + " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumE " + --- End diff -- I think we can add some test for multi aggregation with distinct. > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973955#comment-15973955 ] ASF GitHub Bot commented on FLINK-6250: --- Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112103318 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState =
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112103318 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row]) --- End diff --
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112105763 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -1140,6 +1140,202 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testNonPartitionedProcTimeOverDistinctWindow(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DIST(e)) OVER (" + + " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumE " + --- End diff -- I think we can add some test for multi aggregation with distinct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973956#comment-15973956 ] ASF GitHub Bot commented on FLINK-6250: --- Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112105507 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState =
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112105029 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row]) +distinctValueState =
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973953#comment-15973953 ] ASF GitHub Bot commented on FLINK-6250: --- Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112105029 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState =
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112105507 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row]) +distinctValueState =
[jira] [Resolved] (FLINK-6324) Refine state access methods in OperatorStateStore
[ https://issues.apache.org/jira/browse/FLINK-6324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-6324. Resolution: Fixed Fix Version/s: 1.3.0 Resolved for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/a1aab64 > Refine state access methods in OperatorStateStore > - > > Key: FLINK-6324 > URL: https://issues.apache.org/jira/browse/FLINK-6324 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > This proposes to refine the OperatorStateStore interface by, > - deprecating Java serialization shortcuts > - rename getOperatorState to getListState > The Java serialization shortcuts can be deprecated because they were > previously introduced to provide a smoother migration path from older > savepoints. However, its usage should definitely be discouraged. > Renaming to {{getListState}} is a preparation of making the names of state > access methods contain information about both its redistribution pattern > on restore and the shape of its data structure, since the combination of > these two is orthogonal. This convention will also provide a better > naming pattern for more state access methods in the future, for example > {{getUnionListState}}. If the method name does not contain its > redistribution pattern (e.g., {{getListState}}), then it simply implies the > default repartitioning scheme (SPLIT_DISTRIBUTE). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-5991. Resolution: Fixed Resolved for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/2ef4900 > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973949#comment-15973949 ] ASF GitHub Bot commented on FLINK-5991: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3508 > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3508: [FLINK-5991] [state-backend, streaming] Expose Bro...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3508 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937 ] mingleizhang edited comment on FLINK-6130 at 4/19/17 2:36 AM: -- [~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided that the previous practice {code}Object result = future.value().get();{code} is meaningless as I can not get any useful message from it. Thanks and appreciate it. was (Author: mingleizhang): [~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided that the previous practice {code}Object result = future.value().get();{code} is meaningless as I can not any useful message from it. Thanks and appreciate it. > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6130) Consider calling resourceManager#getTerminationFuture() with lock held
[ https://issues.apache.org/jira/browse/FLINK-6130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973937#comment-15973937 ] mingleizhang commented on FLINK-6130: - [~Zentol] [~till.rohrmann] That makes sense to me now. So, I just have decided that the previous practice {code}Object result = future.value().get();{code} is meaningless as I can not any useful message from it. Thanks and appreciate it. > Consider calling resourceManager#getTerminationFuture() with lock held > -- > > Key: FLINK-6130 > URL: https://issues.apache.org/jira/browse/FLINK-6130 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > In YarnFlinkApplicationMasterRunner#runApplicationMaster() : > {code} > synchronized (lock) { > LOG.info("Starting High Availability Services"); > ... > } > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > resourceManager#getTerminationFuture() is called without holding lock. > We should store the value returned from > resourceManager#getTerminationFuture() inside the synchronized block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6324) Refine state access methods in OperatorStateStore
Tzu-Li (Gordon) Tai created FLINK-6324: -- Summary: Refine state access methods in OperatorStateStore Key: FLINK-6324 URL: https://issues.apache.org/jira/browse/FLINK-6324 Project: Flink Issue Type: Improvement Components: DataStream API, State Backends, Checkpointing Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai This proposes to refine the OperatorStateStore interface by, - deprecating Java serialization shortcuts - rename getOperatorState to getListState The Java serialization shortcuts can be deprecated because they were previously introduced to provide a smoother migration path from older savepoints. However, its usage should definitely be discouraged. Renaming to {{getListState}} is a preparation of making the names of state access methods contain information about both its redistribution pattern on restore and the shape of its data structure, since the combination of these two is orthogonal. This convention will also provide a better naming pattern for more state access methods in the future, for example {{getUnionListState}}. If the method name does not contain its redistribution pattern (e.g., {{getListState}}), then it simply implies the default repartitioning scheme (SPLIT_DISTRIBUTE). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6149) add additional flink logical relation nodes
[ https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973910#comment-15973910 ] ASF GitHub Bot commented on FLINK-6149: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 Rebased to master and will merge after build check > add additional flink logical relation nodes > --- > > Key: FLINK-6149 > URL: https://issues.apache.org/jira/browse/FLINK-6149 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 Rebased to master and will merge after build check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973846#comment-15973846 ] ASF GitHub Bot commented on FLINK-6242: --- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3735 @fhueske thanks for your feedback. Yes, we could keep GeneratedAggregations interface very clean as ``` abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) def setForwardedFields(input: Row, output: Row) def accumulate(accumulators: Row, input: Row) def retract(accumulators: Row, input: Row) def createAccumulators(): Row def mergeAccumulatorsPair(a: Row, b: Row): Row def resetAccumulator(accumulators: Row) } ``` But I feel it might be not very good to add more parameters into code generate function as caller function will usually have to construct unnecessary empty parameters. I think we can break code generate functions into 2-3 functions (these are just the interface to process code-gen parameters, the fundamental implementation of each function will be shared). Let me prototype the changes, and we can continue the discussions from there. Regarding to your other comments. I did not look into the logic of previous implementations while just focused on the code-gen. I will take a look and optimize them. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3735 @fhueske thanks for your feedback. Yes, we could keep GeneratedAggregations interface very clean as ``` abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) def setForwardedFields(input: Row, output: Row) def accumulate(accumulators: Row, input: Row) def retract(accumulators: Row, input: Row) def createAccumulators(): Row def mergeAccumulatorsPair(a: Row, b: Row): Row def resetAccumulator(accumulators: Row) } ``` But I feel it might be not very good to add more parameters into code generate function as caller function will usually have to construct unnecessary empty parameters. I think we can break code generate functions into 2-3 functions (these are just the interface to process code-gen parameters, the fundamental implementation of each function will be shared). Let me prototype the changes, and we can continue the discussions from there. Regarding to your other comments. I did not look into the logic of previous implementations while just focused on the code-gen. I will take a look and optimize them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency
[ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973832#comment-15973832 ] ASF GitHub Bot commented on FLINK-6295: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 Ok i think i've got your point. Now using WeakHashMap, we add entries when the map doesn't contain the requested EG id, remove invalid entries when GC happens. By adding `small 2-line branch` as you suggest, we add entries as same way as before, but check if a entry is valid when it's accessed by a handler, and update/remove it if it's invalid. Is it right? > use LoadingCache instead of WeakHashMap to lower latency > > > Key: FLINK-6295 > URL: https://issues.apache.org/jira/browse/FLINK-6295 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Tao Wang >Assignee: Tao Wang > > Now in ExecutionGraphHolder, which is used in many handlers, we use a > WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage > collection. > The latency is too high when JVM do GC rarely, which will make status of jobs > or its tasks unmatched with the real ones. > LoadingCache is a common used cache implementation from guava lib, we can use > its time based eviction to lower latency of status update. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 Ok i think i've got your point. Now using WeakHashMap, we add entries when the map doesn't contain the requested EG id, remove invalid entries when GC happens. By adding `small 2-line branch` as you suggest, we add entries as same way as before, but check if a entry is valid when it's accessed by a handler, and update/remove it if it's invalid. Is it right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112083224 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/metric/DGauge.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog.metric; + +import java.util.List; + +/** + * Mapping of gauge between Flink and Datadog + * */ +public class DGauge extends DMetric { --- End diff -- Their MetricType are different. One is MetricType.counter and another is MetricType.gauge, which will be serialized as a json field --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973662#comment-15973662 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112080186 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { + if (previous.equals(output)) { +// ignore same output +return --- End diff -- We still need to update the state even if we do not emit new values. If we have a max aggregation with an accumulator that holds a map of `10->1, 5->2` and we add `8`. The new accumulator will be `10->1, 8->1, 5->2` but the aggregation result will still be 10. If we later retract `10`, the new max would be `5` but should be `8`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973658#comment-15973658 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112065233 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException { fieldSerializers[i].serialize(o, target); } } + commandSerializer.serialize(record.command, target); --- End diff -- Also by adding the command to `Row` we add serialization overhead to all jobs that use Row (including batch Table API / SQL). > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973656#comment-15973656 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112065725 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -129,11 +131,17 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = false, -isRowsClause = overWindow.isRows) +isRowsClause = overWindow.isRows, +consumeRetraction) --- End diff -- I think it would be better to enable retraction for all types of OVER aggregates at the same time. Just supporting one specific type adds more confusion than it helps, in my opinion. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973659#comment-15973659 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112060500 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException { fieldSerializers[i].serialize(o, target); } } + commandSerializer.serialize(record.command, target); --- End diff -- I'm afraid we cannot change the serialization of `Row`. `Row` is a public class in `flink-core` and not an internal `flink-table` class. Hence, it is used at other places and might also be part of user applications. If we change the serialization, users might not be able to restore a job on 1.3 from a savepoint taken with 1.2. This restriction rules out to simply add a field to `Row` which would avoid major refactorings. I see two options to add the command field to the data streams in `flink-table` 1. use a regular field in `Row`. This would mean that the physical layout of the `Row` is no longer the same as the logical layout, i.e., the one expected by Calcite. However, we will probably change this anyway for the upcoming changes related to the time indicators. For these, the physical layout will have fewer fields than the logical layout (we will remove time fields which are in the meta data of Flink's records or taken as processing time). By adding the command field, we would add a field which is not in the logical layout. The problem with this approach is that the command field would be at different positions in the Row (probably the last one). We could leverage the changes introduced by the time indicator changes (or the other way round). @twalthr is working on this. You can have a look at the current status here: https://github.com/twalthr/flink/tree/FLINK-5884 2. The other option is to wrap the rows in a custom data type similar to a `Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` which forward most calls to the type info, serializer, and comparator of `Row`. The problem with this approach is that we need to change the return types of all functions. For some functions this might not be a big issue if we can take the `Row` object before passing it to the code gen'd functions. The command field could be set when the result Row is returned or in a wrapping `Collector`. My gut feeling is that the second approach is easier to implement because we (hopefully) do not need to touch the generated code and "just" need to wrap all `Row` objects in `CRow` objects. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973655#comment-15973655 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078358 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -68,12 +70,35 @@ class GroupAggProcessFunction( var accumulators = state.value() if (null == accumulators) { + previous = null accumulators = new Row(aggregates.length) i = 0 while (i < aggregates.length) { accumulators.setField(i, aggregates(i).createAccumulator()) i += 1 } +} else { + // get previous row + if (generateRetraction) { +if (null == previous) { + previous = new Row(groupings.length + aggregates.length) --- End diff -- can we initialize `previous` in `open()`? > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973665#comment-15973665 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112081457 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973664#comment-15973664 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -41,14 +41,16 @@ class GroupAggProcessFunction( private val aggregates: Array[AggregateFunction[_]], private val aggFields: Array[Array[Int]], private val groupings: Array[Int], -private val aggregationStateType: RowTypeInfo) +private val aggregationStateType: RowTypeInfo, +private val generateRetraction: Boolean) extends ProcessFunction[Row, Row] { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) Preconditions.checkArgument(aggregates.length == aggFields.length) private var output: Row = _ --- End diff -- rename to `newRow`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973657#comment-15973657 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112079294 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { --- End diff -- check against `generateRetraction`. The check can be optimized because `generateRetraction`is a `val` and hence `final`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973663#comment-15973663 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112081928 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973660#comment-15973660 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112064885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate( val groupingKeys = grouping.indices.toArray val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + +if (consumeRetraction) { + throw new TableException( +"Retraction on group window is not supported yet. Note: Currently, group window should " + + "not follow an unbounded groupby.") --- End diff -- `unbounded groupBy` -> `non-windowed GroupBy`? > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973661#comment-15973661 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078731 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -41,14 +41,16 @@ class GroupAggProcessFunction( private val aggregates: Array[AggregateFunction[_]], private val aggFields: Array[Array[Int]], private val groupings: Array[Int], -private val aggregationStateType: RowTypeInfo) +private val aggregationStateType: RowTypeInfo, +private val generateRetraction: Boolean) extends ProcessFunction[Row, Row] { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) Preconditions.checkArgument(aggregates.length == aggFields.length) private var output: Row = _ + private var previous: Row = _ --- End diff -- rename to `previousRow`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112080959 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( --- End diff -- Please use more test data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112081457 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + over agg(unbounded, procTime, keyed) + @Test + def
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078731 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -41,14 +41,16 @@ class GroupAggProcessFunction( private val aggregates: Array[AggregateFunction[_]], private val aggFields: Array[Array[Int]], private val groupings: Array[Int], -private val aggregationStateType: RowTypeInfo) +private val aggregationStateType: RowTypeInfo, +private val generateRetraction: Boolean) extends ProcessFunction[Row, Row] { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) Preconditions.checkArgument(aggregates.length == aggFields.length) private var output: Row = _ + private var previous: Row = _ --- End diff -- rename to `previousRow`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973654#comment-15973654 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112080959 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( --- End diff -- Please use more test data. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -41,14 +41,16 @@ class GroupAggProcessFunction( private val aggregates: Array[AggregateFunction[_]], private val aggFields: Array[Array[Int]], private val groupings: Array[Int], -private val aggregationStateType: RowTypeInfo) +private val aggregationStateType: RowTypeInfo, +private val generateRetraction: Boolean) extends ProcessFunction[Row, Row] { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) Preconditions.checkArgument(aggregates.length == aggFields.length) private var output: Row = _ --- End diff -- rename to `newRow`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112080186 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { + if (previous.equals(output)) { +// ignore same output +return --- End diff -- We still need to update the state even if we do not emit new values. If we have a max aggregation with an accumulator that holds a map of `10->1, 5->2` and we add `8`. The new accumulator will be `10->1, 8->1, 5->2` but the aggregation result will still be 10. If we later retract `10`, the new max would be `5` but should be `8`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112065725 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -129,11 +131,17 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = false, -isRowsClause = overWindow.isRows) +isRowsClause = overWindow.isRows, +consumeRetraction) --- End diff -- I think it would be better to enable retraction for all types of OVER aggregates at the same time. Just supporting one specific type adds more confusion than it helps, in my opinion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112079294 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { --- End diff -- check against `generateRetraction`. The check can be optimized because `generateRetraction`is a `val` and hence `final`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078358 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -68,12 +70,35 @@ class GroupAggProcessFunction( var accumulators = state.value() if (null == accumulators) { + previous = null accumulators = new Row(aggregates.length) i = 0 while (i < aggregates.length) { accumulators.setField(i, aggregates(i).createAccumulator()) i += 1 } +} else { + // get previous row + if (generateRetraction) { +if (null == previous) { + previous = new Row(groupings.length + aggregates.length) --- End diff -- can we initialize `previous` in `open()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112065233 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException { fieldSerializers[i].serialize(o, target); } } + commandSerializer.serialize(record.command, target); --- End diff -- Also by adding the command to `Row` we add serialization overhead to all jobs that use Row (including batch Table API / SQL). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112081928 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + over agg(unbounded, procTime, keyed) + @Test + def
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112064885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate( val groupingKeys = grouping.indices.toArray val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + +if (consumeRetraction) { + throw new TableException( +"Retraction on group window is not supported yet. Note: Currently, group window should " + + "not follow an unbounded groupby.") --- End diff -- `unbounded groupBy` -> `non-windowed GroupBy`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112060500 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException { fieldSerializers[i].serialize(o, target); } } + commandSerializer.serialize(record.command, target); --- End diff -- I'm afraid we cannot change the serialization of `Row`. `Row` is a public class in `flink-core` and not an internal `flink-table` class. Hence, it is used at other places and might also be part of user applications. If we change the serialization, users might not be able to restore a job on 1.3 from a savepoint taken with 1.2. This restriction rules out to simply add a field to `Row` which would avoid major refactorings. I see two options to add the command field to the data streams in `flink-table` 1. use a regular field in `Row`. This would mean that the physical layout of the `Row` is no longer the same as the logical layout, i.e., the one expected by Calcite. However, we will probably change this anyway for the upcoming changes related to the time indicators. For these, the physical layout will have fewer fields than the logical layout (we will remove time fields which are in the meta data of Flink's records or taken as processing time). By adding the command field, we would add a field which is not in the logical layout. The problem with this approach is that the command field would be at different positions in the Row (probably the last one). We could leverage the changes introduced by the time indicator changes (or the other way round). @twalthr is working on this. You can have a look at the current status here: https://github.com/twalthr/flink/tree/FLINK-5884 2. The other option is to wrap the rows in a custom data type similar to a `Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` which forward most calls to the type info, serializer, and comparator of `Row`. The problem with this approach is that we need to change the return types of all functions. For some functions this might not be a big issue if we can take the `Row` object before passing it to the code gen'd functions. The command field could be set when the result Row is returned or in a wrapping `Collector`. My gut feeling is that the second approach is easier to implement because we (hopefully) do not need to touch the generated code and "just" need to wrap all `Row` objects in `CRow` objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6032) CEP-Clean up the operator state when not needed.
[ https://issues.apache.org/jira/browse/FLINK-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973569#comment-15973569 ] ASF GitHub Bot commented on FLINK-6032: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071560 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java --- @@ -31,6 +31,18 @@ public double getVolume() { } @Override + public boolean equals(Object obj) { + return obj instanceof SubEvent && + super.equals(obj) && + ((SubEvent) obj).volume == volume; + } + + @Override + public int hashCode() { + return super.hashCode() + (int) volume; --- End diff -- Common practice is to multiply super.hashCode() by a prime (e.g. 37) > CEP-Clean up the operator state when not needed. > > > Key: FLINK-6032 > URL: https://issues.apache.org/jira/browse/FLINK-6032 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3541: [FLINK-6032] [cep] Clean-up operator state when no...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071560 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java --- @@ -31,6 +31,18 @@ public double getVolume() { } @Override + public boolean equals(Object obj) { + return obj instanceof SubEvent && + super.equals(obj) && + ((SubEvent) obj).volume == volume; + } + + @Override + public int hashCode() { + return super.hashCode() + (int) volume; --- End diff -- Common practice is to multiply super.hashCode() by a prime (e.g. 37) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6032) CEP-Clean up the operator state when not needed.
[ https://issues.apache.org/jira/browse/FLINK-6032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973566#comment-15973566 ] ASF GitHub Bot commented on FLINK-6032: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071274 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -385,4 +393,25 @@ public int hashCode() { return getClass().hashCode(); } } + + // Testing Methods // + + @VisibleForTesting + public boolean hasNonEmptyNFA(KEY key) throws IOException { + setCurrentKey(key); + return nfaOperatorState.value() != null; + } + + @VisibleForTesting + public boolean hasNonEmptyPQ(KEY key) throws IOException { --- End diff -- These 3 methods can be declared package private. > CEP-Clean up the operator state when not needed. > > > Key: FLINK-6032 > URL: https://issues.apache.org/jira/browse/FLINK-6032 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3541: [FLINK-6032] [cep] Clean-up operator state when no...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071274 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -385,4 +393,25 @@ public int hashCode() { return getClass().hashCode(); } } + + // Testing Methods // + + @VisibleForTesting + public boolean hasNonEmptyNFA(KEY key) throws IOException { + setCurrentKey(key); + return nfaOperatorState.value() != null; + } + + @VisibleForTesting + public boolean hasNonEmptyPQ(KEY key) throws IOException { --- End diff -- These 3 methods can be declared package private. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112065478 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,85 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + +org.apache.flink +flink-metrics-datadog +1.3-SNAPSHOT + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.google.guava --- End diff -- You should shade this away to make sure there aren't any dependency conflicts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112065330 --- Diff: docs/monitoring/metrics.md --- @@ -436,6 +436,38 @@ metrics.reporter.stsd.port: 8125 {% endhighlight %} +### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Parameters: + +- `apikey` - the Datadog API key +- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only + +Example configuration: + +{% highlight yaml %} + +metrics.reporters: dghttp +metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter +metrics.reporter.dghttp.apikey: xxx +metrics.reporter.dghttp.tags: myflinkapp,prod + +// , , , , , will be sent to Datadog as tags +metrics.scope.jm: .jobmanager +metrics.scope.jm.job: ..jobmanager.job +metrics.scope.tm: ..taskmanager +metrics.scope.tm.job: ...taskmanager.job +metrics.scope.task: .task +metrics.scope.operator: .operator + +{% endhighlight %} + +Such metric reporting implementation is a best practice based on our experience working with Datadog. It helps --- End diff -- Please remove this, it doesn't belong in the documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112065890 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/metric/DGauge.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog.metric; + +import java.util.List; + +/** + * Mapping of gauge between Flink and Datadog + * */ +public class DGauge extends DMetric { --- End diff -- What's the benefit in differentiating between DCounters and DGauges? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3736 If you want to enforce a specific scope config then i would suggest to instead ignore the configured one (which means not using `MetricGroup#getMetricIdentifier()`) and instead extracting the variables you want from `MetricGroup#getAllVariables()`. The JMXReporter does this as well and is one example of a reporter that completely ignores scope formats. It builds a series key-value pairs containing the variables for every metric, not unlike this reporter. You should be able to support Meters, as the DCounter works with Numbers. Only exporting the rate is perfectly reasonable. You don't necessarily have to ignore Histograms; you can instead create a number of DCounters for every property that is extracted from the HistogramStatistics; it's a bit more work but doable imo. Instead of storing the actual value in the DMetric why not store the Flink metric instead? When `DMetric#getPoints()` is called you could retrieve the value form the metric, you wouldn't have to update anything manually. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 Great advice. Thanks, @zentol ! My questions to your suggestions: 1. If I make DatadogHttpReport implement MetricReporter interface, shall I just ignore Histogram and Meter which are not supported by Datadog Http API and document that? 2. For creating DGauges/DCounters. I believe you are correct. I can create new objects upon metrics registration, and continue update their values, right? 3. About parsing tags. Great to know I can do it via MetricGroup! Thus, shall I just document the metrics scope configs should be as the following? ``` metrics.scope.jm: jobmanager metrics.scope.jm.job: jobmanager.job metrics.scope.tm: taskmanager metrics.scope.tm.job: taskmanager.job metrics.scope.task: task metrics.scope.operator: operator ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3737: Release 1.2
Github user kimlng commented on the issue: https://github.com/apache/flink/pull/3737 Pulled request accidentally. Need to revert this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3737: Release 1.2
Github user kimlng closed the pull request at: https://github.com/apache/flink/pull/3737 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3737: Release 1.2
GitHub user kimlng opened a pull request: https://github.com/apache/flink/pull/3737 Release 1.2 Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/flink release-1.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3737.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3737 commit 119d39b6edac6a0a80f90bb07794eca1f31425f7 Author: mtuniqueDate: 2017-01-12T03:55:57Z [FLINK-5448] [checkpoints] Fix typo in StateAssignmentOperation Exception This closes #3097 commit 30b467f266970e267792411d3148b4379ec23439 Author: mtunique Date: 2017-01-12T04:02:49Z [FLINK-5438] [streaming api] Typo in JobGraph generator Exception This closes #3098 commit 6b3c683450eb7aee1c9c65be75a0fddb06cea2ce Author: Greg Hogan Date: 2017-01-13T18:33:29Z [FLINK-5485] [webfrontend] Mark compiled web frontend files as binary when processed by git diff Particularly beneficial now that javascript is minified, we can mark compiled web frontend files as binary when processed by git diff. https://linux.die.net/man/5/gitattributes This does not affect how files are displayed by github. This closes #3122 commit b1be3f5c3c9e7410d92c74422b10a6efb42fd4d5 Author: Stephan Ewen Date: 2017-01-11T20:05:57Z [FLINK-5345] [core] Add a utility to delete directories without failing in the presence of concurrent deletes commit d1b86aab09061627d8b8c8f99b4277cc60e3dc28 Author: Stephan Ewen Date: 2017-01-12T09:49:13Z [FLINK-5345] [core] Migrate various cleanup calls to concurrency-safe directory deletion commit 27c11e1b79bd68cbd2e8275c7938478e2e9532e6 Author: Aleksandr Chermenin Date: 2016-12-16T11:42:50Z [FLINK-3617] [scala apis] Added null value check. commit 2eb926f2bed5723f160620b94f3b67e5dc418387 Author: Stephan Ewen Date: 2017-01-16T19:17:13Z [FLINK-4959] [docs] Add documentation for ProcessFunction commit f3aea01eb6ebecbb58c368cae9006a7831f07f41 Author: twalthr Date: 2017-01-13T14:22:25Z [FLINK-5447] [table] Sync documentation of built-in functions for Table API with SQL This closes #3126. commit 02b0e65034cb74944aebff33d44470483a5f26e7 Author: beyond1920 Date: 2016-12-29T07:52:17Z [FLINK-5394] [table] Fix row count estimation This closes #3058. commit 61ac3605f6e56f49b83891c0f348b3fbddb2e075 Author: Kurt Young Date: 2017-01-05T03:32:04Z [FLINK-5144] [table] Fix error while applying rule AggregateJoinTransposeRule This closes #3062. commit aad8d253c9f7dd33a63310353cbe132cd4900c6b Author: Jakub Havlik Date: 2017-01-17T07:26:07Z [FLINK-5518] [hadoopCompat] Add null check to HadoopInputFormatBase.close(). This closes #3133 commit 5f81d20ba1a7ea789af973c0e215cef382a621a6 Author: Keiji Yoshida Date: 2017-01-18T05:22:17Z [hotfix] [docs] Several fixes on "Basic API Concepts". - Fix a wrong function name `groupBy()` to `keyBy()`. - Add closing parentheses. - Fix an invalid return type of sample code. - Remove a duplicate "their". This closes #3145. This closes #3146. This closes #3147. This closes #3148. commit 55483b71f36b84ac57d03a9b83e0e9d9b9b98eab Author: Ufuk Celebi Date: 2017-01-17T18:10:33Z [FLINK-5484] [serialization] Add test for registered Kryo types commit a7644b1716c54dcc9a5535307f7df983c79711cf
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973424#comment-15973424 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala --- @@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions} * it does no final aggregate evaluation. It also includes the logic of * [[DataSetSlideTimeWindowAggFlatMapFunction]]. * - * @param aggregates aggregate functions - * @param groupingKeysLength number of grouping keys - * @param timeFieldPos position of aligned time field + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param windowSize window size of the sliding window * @param windowSlide window slide of the sliding window * @param returnType return type of this function */ class DataSetSlideTimeWindowAggReduceGroupFunction( -private val aggregates: Array[AggregateFunction[_ <: Any]], -private val groupingKeysLength: Int, -private val timeFieldPos: Int, +private val genAggregations: GeneratedAggregationsFunction, +private val keysAndAggregatesArity: Int, private val windowSize: Long, private val windowSlide: Long, @transient private val returnType: TypeInformation[Row]) extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] - with ResultTypeQueryable[Row] { + with ResultTypeQueryable[Row] + with Compiler[GeneratedAggregations] { - Preconditions.checkNotNull(aggregates) + private val timeFieldPos = returnType.getArity - 1 + private val intermediateWindowStartPos = keysAndAggregatesArity protected var intermediateRow: Row = _ - // add one field to store window start - protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1 - protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { -new JArrayList[Accumulator](2) - } - private val intermediateWindowStartPos: Int = intermediateRowArity - 1 + private var accumulators: Row = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ override def open(config: Configuration) { -intermediateRow = new Row(intermediateRowArity) - -// init lists with two empty accumulators -var i = 0 -while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 -} +LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + +s"Code:\n$genAggregations.code") +val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) +LOG.debug("Instantiating AggregateHelper.") +function = clazz.newInstance() + +accumulators = function.createAccumulators() +intermediateRow = function.createOutputRow() } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { // reset first accumulator -var i = 0 -while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - i += 1 -} +function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() // accumulate - i = 0 - while (i < aggregates.length) { -// insert received accumulator into acc list -val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] -accumulatorList(i).set(1, newAcc) -// merge acc list -val retAcc = aggregates(i).merge(accumulatorList(i)) -// insert result into acc list -accumulatorList(i).set(0, retAcc) -i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // trigger tumbling evaluation if (!iterator.hasNext) { --- End diff -- move this behind the loop > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL:
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala --- @@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction( override def combine(records: Iterable[Row]): Row = { // reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 -} +function.resetAccumulator(accumulators) val iterator = records.iterator() + while (iterator.hasNext) { val record = iterator.next() - i = 0 - while (i < aggregates.length) { -// insert received accumulator into acc list -val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] -accumulatorList(i).set(1, newAcc) -// merge acc list -val retAcc = aggregates(i).merge(accumulatorList(i)) -// insert result into acc list -accumulatorList(i).set(0, retAcc) -i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { --- End diff -- move this behind the loop --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111986866 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -263,33 +263,56 @@ class CodeGenerator( aggFields: Array[Array[Int]], aggMapping: Array[Int], fwdMapping: Array[(Int, Int)], - outputArity: Int) + outputArity: Int, + groupingKeys: Array[Int]) : GeneratedAggregationsFunction = { def genSetAggregationResults( accTypes: Array[String], aggs: Array[String], aggMapping: Array[Int]): String = { - val sig: String = + val sigHelper: String = j""" -| public void setAggregationResults( -|org.apache.flink.types.Row accs, -|org.apache.flink.types.Row output)""".stripMargin + | private final void setAggregationResultsHelper( + |org.apache.flink.types.Row accs, + |org.apache.flink.types.Row output, + |java.lang.Integer offset)""".stripMargin - val setAggs: String = { + val setAggsHelper: String = { for (i <- aggs.indices) yield j""" |org.apache.flink.table.functions.AggregateFunction baseClass$i = | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; | |output.setField( - | ${aggMapping(i)}, + | ${aggMapping(i)} + offset, | baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin }.mkString("\n") - j"""$sig { - |$setAggs + val setAggregationResults: String = +j""" + | public void setAggregationResults( + |org.apache.flink.types.Row accs, + |org.apache.flink.types.Row output) { + |setAggregationResultsHelper(accs, output, 0); --- End diff -- Code generated methods should be as "flat" as possible. Calling other helper methods adds overhead compared to inlining the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973421#comment-15973421 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111999493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala --- @@ -110,12 +110,8 @@ class DataSetSessionWindowAggregatePreProcessor( var windowEnd: java.lang.Long = null --- End diff -- Move implementation to `combine()` can forward the `mapPartition()` call to `combine()` > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala --- @@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions} * it does no final aggregate evaluation. It also includes the logic of * [[DataSetSlideTimeWindowAggFlatMapFunction]]. * - * @param aggregates aggregate functions - * @param groupingKeysLength number of grouping keys - * @param timeFieldPos position of aligned time field + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param windowSize window size of the sliding window * @param windowSlide window slide of the sliding window * @param returnType return type of this function */ class DataSetSlideTimeWindowAggReduceGroupFunction( -private val aggregates: Array[AggregateFunction[_ <: Any]], -private val groupingKeysLength: Int, -private val timeFieldPos: Int, +private val genAggregations: GeneratedAggregationsFunction, +private val keysAndAggregatesArity: Int, private val windowSize: Long, private val windowSlide: Long, @transient private val returnType: TypeInformation[Row]) extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] - with ResultTypeQueryable[Row] { + with ResultTypeQueryable[Row] + with Compiler[GeneratedAggregations] { - Preconditions.checkNotNull(aggregates) + private val timeFieldPos = returnType.getArity - 1 + private val intermediateWindowStartPos = keysAndAggregatesArity protected var intermediateRow: Row = _ - // add one field to store window start - protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1 - protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { -new JArrayList[Accumulator](2) - } - private val intermediateWindowStartPos: Int = intermediateRowArity - 1 + private var accumulators: Row = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ override def open(config: Configuration) { -intermediateRow = new Row(intermediateRowArity) - -// init lists with two empty accumulators -var i = 0 -while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 -} +LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + +s"Code:\n$genAggregations.code") +val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) +LOG.debug("Instantiating AggregateHelper.") +function = clazz.newInstance() + +accumulators = function.createAccumulators() +intermediateRow = function.createOutputRow() } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { // reset first accumulator -var i = 0 -while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - i += 1 -} +function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() // accumulate - i = 0 - while (i < aggregates.length) { -// insert received accumulator into acc list -val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] -accumulatorList(i).set(1, newAcc) -// merge acc list -val retAcc = aggregates(i).merge(accumulatorList(i)) -// insert result into acc list -accumulatorList(i).set(0, retAcc) -i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // trigger tumbling evaluation if (!iterator.hasNext) { --- End diff -- move this behind the loop --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973412#comment-15973412 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111986866 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -263,33 +263,56 @@ class CodeGenerator( aggFields: Array[Array[Int]], aggMapping: Array[Int], fwdMapping: Array[(Int, Int)], - outputArity: Int) + outputArity: Int, + groupingKeys: Array[Int]) : GeneratedAggregationsFunction = { def genSetAggregationResults( accTypes: Array[String], aggs: Array[String], aggMapping: Array[Int]): String = { - val sig: String = + val sigHelper: String = j""" -| public void setAggregationResults( -|org.apache.flink.types.Row accs, -|org.apache.flink.types.Row output)""".stripMargin + | private final void setAggregationResultsHelper( + |org.apache.flink.types.Row accs, + |org.apache.flink.types.Row output, + |java.lang.Integer offset)""".stripMargin - val setAggs: String = { + val setAggsHelper: String = { for (i <- aggs.indices) yield j""" |org.apache.flink.table.functions.AggregateFunction baseClass$i = | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; | |output.setField( - | ${aggMapping(i)}, + | ${aggMapping(i)} + offset, | baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin }.mkString("\n") - j"""$sig { - |$setAggs + val setAggregationResults: String = +j""" + | public void setAggregationResults( + |org.apache.flink.types.Row accs, + |org.apache.flink.types.Row output) { + |setAggregationResultsHelper(accs, output, 0); --- End diff -- Code generated methods should be as "flat" as possible. Calling other helper methods adds overhead compared to inlining the code. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973425#comment-15973425 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112007816 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) /** +* Calculates the results from accumulators, and set the results to the output (with key offset) +* +* @param accumulators the accumulators (saved in a row) which contains the current +* aggregated results +* @param output output results collected in a row +*/ + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row) --- End diff -- Actually, I'm not sure if we really need to implement a different code generation function. I had a look at the code generation code and think that we could just add a few more parameters to the current code gen method. Right now, the behavior of most generated methods can be exactly defined: - `createAccumulators()`: generates a `Row` with the accumulators for each provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect a Row of accumulators with exactly this layout as one of their input parameters. In the following, this parameter is called `accs`. - `accumulate(accs, row)`: The `aggFields` parameter controls which fields of `row` are accumulated into which accumulator. We should rename this parameter to `accFields` though, IMO. - `retract(accs, row)`: same as for `accumulate`. We should add a separate parameter `retractFields: Array[Int]` though. - `setForwardedFields(input, output)`: The `fwdMapping` parameter controls which field of the input row is copied to which position of the output row. We could add an optional parameter to copy the `groupSetMapping` to the output as well. - `setAggregationResults(accs, output)`: The `aggMapping` parameter controls to which output fields the aggregation results are copied. If we add another parameter `partialResults: Boolean`, we can control whether to copy final results (`AggregateFunction.getValue()`) or partial results (the accumulator). - `createOutputRow()`: the `outputArity` parameter specfies the arity of the output row. - `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible method**. We could change the behavior of the method as follows: The method expects as first parameter (`accs`) a Row with the same layout as generated by `createAccumulators`. The second parameter can be any row with accumulators at arbitrary positions. To enable the merging, we add a parameter `mergeMapping: Array[Int]` to the code generating function which defines which fields of the `other` parameter are merged with the fields in the `accs` Row. The method returns a Row with the default layout (as generated by `createAccumulators()`). - `resetAccumulator(accs)`: resets a Row of accumulators of the known layout. I haven't checked this thoroughly, but I think with these parameters, we can control the generated code sufficiently to support all aggregation operators for DataSet and DataStream, i.e., we can generate the currently existing functions such that they behave as the more specialized ones that you added. Since all code gen parameters (`accFields`, `retractFields`, `fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, `mergeMapping`) can be independently set for each type of operator, this should give us the flexibility for all types for operators. We only need to parameterize the code generation method appropriately. In addition, we could make all parameters `Option` and generate empty methods if the parameters for a function are not set. (This could also be a follow up issue, IMO) What do you think @shaoxuan-wang ? > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973423#comment-15973423 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002140 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala --- @@ -30,78 +30,46 @@ import org.apache.flink.types.Row * * It is used for sliding on batch for both time and count-windows. * - * @param aggregates aggregate functions. - * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping index mapping between aggregate function list and aggregated value - * index in output Row. - * @param finalRowArity output row field count + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( -aggregates: Array[AggregateFunction[_ <: Any]], -groupKeysMapping: Array[(Int, Int)], -aggregateMapping: Array[(Int, Int)], -finalRowArity: Int, +genAggregations: GeneratedAggregationsFunction, +keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( -aggregates, -groupKeysMapping, -aggregateMapping, -finalRowArity, +genAggregations, +keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, windowSize) with CombineFunction[Row, Row] { - private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 - private val intermediateRow: Row = new Row(intermediateRowArity) + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1) override def combine(records: Iterable[Row]): Row = { -// reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 -} +// reset accumulator +function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() --- End diff -- make `record` a `var` and declare it outside of the loop. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973418#comment-15973418 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112001966 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala --- @@ -30,78 +30,46 @@ import org.apache.flink.types.Row * * It is used for sliding on batch for both time and count-windows. * - * @param aggregates aggregate functions. - * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping index mapping between aggregate function list and aggregated value - * index in output Row. - * @param finalRowArity output row field count + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( -aggregates: Array[AggregateFunction[_ <: Any]], -groupKeysMapping: Array[(Int, Int)], -aggregateMapping: Array[(Int, Int)], -finalRowArity: Int, +genAggregations: GeneratedAggregationsFunction, +keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( -aggregates, -groupKeysMapping, -aggregateMapping, -finalRowArity, +genAggregations, +keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, windowSize) with CombineFunction[Row, Row] { - private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 - private val intermediateRow: Row = new Row(intermediateRowArity) + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1) override def combine(records: Iterable[Row]): Row = { -// reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 -} +// reset accumulator +function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() - // accumulate - i = 0 - while (i < aggregates.length) { -// insert received accumulator into acc list -val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] -accumulatorList(i).set(1, newAcc) -// merge acc list -val retAcc = aggregates(i).merge(accumulatorList(i)) -// insert result into acc list -accumulatorList(i).set(0, retAcc) -i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { --- End diff -- move this behind the loop to save the check of the condition in the loop body. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111990633 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -263,33 +263,56 @@ class CodeGenerator( aggFields: Array[Array[Int]], aggMapping: Array[Int], fwdMapping: Array[(Int, Int)], - outputArity: Int) + outputArity: Int, + groupingKeys: Array[Int]) : GeneratedAggregationsFunction = { def genSetAggregationResults( accTypes: Array[String], aggs: Array[String], aggMapping: Array[Int]): String = { - val sig: String = + val sigHelper: String = j""" -| public void setAggregationResults( -|org.apache.flink.types.Row accs, -|org.apache.flink.types.Row output)""".stripMargin + | private final void setAggregationResultsHelper( + |org.apache.flink.types.Row accs, + |org.apache.flink.types.Row output, + |java.lang.Integer offset)""".stripMargin - val setAggs: String = { + val setAggsHelper: String = { for (i <- aggs.indices) yield j""" |org.apache.flink.table.functions.AggregateFunction baseClass$i = | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; | |output.setField( - | ${aggMapping(i)}, + | ${aggMapping(i)} + offset, --- End diff -- `${aggMapping(i)} + offset` -> `${aggMapping(i) + offset}` to add the constant `offset` to the mapping before generating the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112001966 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala --- @@ -30,78 +30,46 @@ import org.apache.flink.types.Row * * It is used for sliding on batch for both time and count-windows. * - * @param aggregates aggregate functions. - * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping index mapping between aggregate function list and aggregated value - * index in output Row. - * @param finalRowArity output row field count + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( -aggregates: Array[AggregateFunction[_ <: Any]], -groupKeysMapping: Array[(Int, Int)], -aggregateMapping: Array[(Int, Int)], -finalRowArity: Int, +genAggregations: GeneratedAggregationsFunction, +keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( -aggregates, -groupKeysMapping, -aggregateMapping, -finalRowArity, +genAggregations, +keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, windowSize) with CombineFunction[Row, Row] { - private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 - private val intermediateRow: Row = new Row(intermediateRowArity) + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1) override def combine(records: Iterable[Row]): Row = { -// reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 -} +// reset accumulator +function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() - // accumulate - i = 0 - while (i < aggregates.length) { -// insert received accumulator into acc list -val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] -accumulatorList(i).set(1, newAcc) -// merge acc list -val retAcc = aggregates(i).merge(accumulatorList(i)) -// insert result into acc list -accumulatorList(i).set(0, retAcc) -i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { --- End diff -- move this behind the loop to save the check of the condition in the loop body. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973416#comment-15973416 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111995370 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala --- @@ -68,23 +72,16 @@ class DataSetAggFunction( override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { // create accumulators -var i = 0 -while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 -} +accumulators = function.createAccumulators() val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() --- End diff -- we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112003763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala --- @@ -25,58 +25,56 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row -import org.apache.flink.util.Preconditions - +import org.slf4j.LoggerFactory /** * This map function only works for windows on batch tables. * It appends an (aligned) rowtime field to the end of the output row. + * + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param timeFieldPos Time field position in input row + * @param tumbleTimeWindowSize The size of tumble time window */ class DataSetWindowAggMapFunction( -private val aggregates: Array[AggregateFunction[_]], -private val aggFields: Array[Array[Int]], -private val groupingKeys: Array[Int], -private val timeFieldPos: Int, // time field position in input row +private val genAggregations: GeneratedAggregationsFunction, +private val timeFieldPos: Int, private val tumbleTimeWindowSize: Option[Long], @transient private val returnType: TypeInformation[Row]) - extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] { - - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggFields) - Preconditions.checkArgument(aggregates.length == aggFields.length) + extends RichMapFunction[Row, Row] +with ResultTypeQueryable[Row] +with Compiler[GeneratedAggregations] { private var output: Row = _ - // add one more arity to store rowtime - private val partialRowLength = groupingKeys.length + aggregates.length + 1 - // rowtime index in the buffer output row - private val rowtimeIndex: Int = partialRowLength - 1 + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ override def open(config: Configuration) { -output = new Row(partialRowLength) +LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + +s"Code:\n$genAggregations.code") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genAggregations.name, + genAggregations.code) +LOG.debug("Instantiating AggregateHelper.") +function = clazz.newInstance() + +output = function.createOutputRow() } override def map(input: Row): Row = { -var i = 0 -while (i < aggregates.length) { - val agg = aggregates(i) - val fieldValue = input.getField(aggFields(i)(0)) - val accumulator = agg.createAccumulator() - agg.accumulate(accumulator, fieldValue) - output.setField(groupingKeys.length + i, accumulator) - i += 1 -} +function.createAccumulatorsAndSetToOutput(output) --- End diff -- create an accumulator with `function.createAccumulator()` once in `open()`, reset it here, and copy it to `output` with `function.setAggregationResults()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112007816 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) /** +* Calculates the results from accumulators, and set the results to the output (with key offset) +* +* @param accumulators the accumulators (saved in a row) which contains the current +* aggregated results +* @param output output results collected in a row +*/ + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row) --- End diff -- Actually, I'm not sure if we really need to implement a different code generation function. I had a look at the code generation code and think that we could just add a few more parameters to the current code gen method. Right now, the behavior of most generated methods can be exactly defined: - `createAccumulators()`: generates a `Row` with the accumulators for each provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect a Row of accumulators with exactly this layout as one of their input parameters. In the following, this parameter is called `accs`. - `accumulate(accs, row)`: The `aggFields` parameter controls which fields of `row` are accumulated into which accumulator. We should rename this parameter to `accFields` though, IMO. - `retract(accs, row)`: same as for `accumulate`. We should add a separate parameter `retractFields: Array[Int]` though. - `setForwardedFields(input, output)`: The `fwdMapping` parameter controls which field of the input row is copied to which position of the output row. We could add an optional parameter to copy the `groupSetMapping` to the output as well. - `setAggregationResults(accs, output)`: The `aggMapping` parameter controls to which output fields the aggregation results are copied. If we add another parameter `partialResults: Boolean`, we can control whether to copy final results (`AggregateFunction.getValue()`) or partial results (the accumulator). - `createOutputRow()`: the `outputArity` parameter specfies the arity of the output row. - `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible method**. We could change the behavior of the method as follows: The method expects as first parameter (`accs`) a Row with the same layout as generated by `createAccumulators`. The second parameter can be any row with accumulators at arbitrary positions. To enable the merging, we add a parameter `mergeMapping: Array[Int]` to the code generating function which defines which fields of the `other` parameter are merged with the fields in the `accs` Row. The method returns a Row with the default layout (as generated by `createAccumulators()`). - `resetAccumulator(accs)`: resets a Row of accumulators of the known layout. I haven't checked this thoroughly, but I think with these parameters, we can control the generated code sufficiently to support all aggregation operators for DataSet and DataStream, i.e., we can generate the currently existing functions such that they behave as the more specialized ones that you added. Since all code gen parameters (`accFields`, `retractFields`, `fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, `mergeMapping`) can be independently set for each type of operator, this should give us the flexibility for all types for operators. We only need to parameterize the code generation method appropriately. In addition, we could make all parameters `Option` and generate empty methods if the parameters for a function are not set. (This could also be a follow up issue, IMO) What do you think @shaoxuan-wang ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002140 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala --- @@ -30,78 +30,46 @@ import org.apache.flink.types.Row * * It is used for sliding on batch for both time and count-windows. * - * @param aggregates aggregate functions. - * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping index mapping between aggregate function list and aggregated value - * index in output Row. - * @param finalRowArity output row field count + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( -aggregates: Array[AggregateFunction[_ <: Any]], -groupKeysMapping: Array[(Int, Int)], -aggregateMapping: Array[(Int, Int)], -finalRowArity: Int, +genAggregations: GeneratedAggregationsFunction, +keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( -aggregates, -groupKeysMapping, -aggregateMapping, -finalRowArity, +genAggregations, +keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, windowSize) with CombineFunction[Row, Row] { - private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 - private val intermediateRow: Row = new Row(intermediateRowArity) + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1) override def combine(records: Iterable[Row]): Row = { -// reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 -} +// reset accumulator +function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() --- End diff -- make `record` a `var` and declare it outside of the loop. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111995370 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala --- @@ -68,23 +72,16 @@ class DataSetAggFunction( override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { // create accumulators -var i = 0 -while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 -} +accumulators = function.createAccumulators() val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() --- End diff -- we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973426#comment-15973426 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111991648 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -451,13 +581,34 @@ class CodeGenerator( { for (i <- accTypes.indices) yield j""" - |accList$i = new java.util.ArrayList<${accTypes(i)}>(2); + |accList$i = new java.util.ArrayList<${accTypes(i)}>(); --- End diff -- Why not creating the `ArrayList` with initial capacity 2? > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111995303 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala --- @@ -19,88 +19,71 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import java.util.{ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} +import org.slf4j.LoggerFactory /** * [[RichGroupReduceFunction]] to compute the final result of a pre-aggregated aggregation * for batch (DataSet) queries. * - * @param aggregates The aggregate functions. - * @param aggOutFields The positions of the aggregation results in the output + * @param genAggregations Code-generated [[GeneratedAggregations]] * @param gkeyOutFields The positions of the grouping keys in the output * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. - * @param finalRowArity The arity of the final resulting row */ class DataSetFinalAggFunction( -private val aggregates: Array[AggregateFunction[_ <: Any]], -private val aggOutFields: Array[Int], +private val genAggregations: GeneratedAggregationsFunction, private val gkeyOutFields: Array[Int], -private val groupingSetsMapping: Array[(Int, Int)], -private val finalRowArity: Int) - extends RichGroupReduceFunction[Row, Row] { +private val groupingSetsMapping: Array[(Int, Int)]) + extends RichGroupReduceFunction[Row, Row] +with Compiler[GeneratedAggregations] { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggOutFields) Preconditions.checkNotNull(gkeyOutFields) Preconditions.checkNotNull(groupingSetsMapping) private var output: Row = _ + private var accumulators: Row = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ private val intermediateGKeys: Option[Array[Int]] = if (!groupingSetsMapping.isEmpty) { Some(gkeyOutFields) } else { None } - private val numAggs = aggregates.length - private val numGKeys = gkeyOutFields.length - - private val accumulators: Array[JArrayList[Accumulator]] = -Array.fill(numAggs)(new JArrayList[Accumulator](2)) - override def open(config: Configuration) { -output = new Row(finalRowArity) - -// init lists with two empty accumulators -for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulators(i).add(accumulator) - accumulators(i).add(accumulator) -} +LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + +s"Code:\n$genAggregations.code") +val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) +LOG.debug("Instantiating AggregateHelper.") +function = clazz.newInstance() + +output = function.createOutputRow() +accumulators = function.createAccumulators() } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { val iterator = records.iterator() // reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulators(i).get(0)) - i += 1 -} +function.resetAccumulator(accumulators) +var i = 0 while (iterator.hasNext) { val record = iterator.next() --- End diff -- we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973415#comment-15973415 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala --- @@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction( override def combine(records: Iterable[Row]): Row = { // reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 -} +function.resetAccumulator(accumulators) val iterator = records.iterator() + while (iterator.hasNext) { val record = iterator.next() - i = 0 - while (i < aggregates.length) { -// insert received accumulator into acc list -val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] -accumulatorList(i).set(1, newAcc) -// merge acc list -val retAcc = aggregates(i).merge(accumulatorList(i)) -// insert result into acc list -accumulatorList(i).set(0, retAcc) -i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { --- End diff -- move this behind the loop > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111991648 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -451,13 +581,34 @@ class CodeGenerator( { for (i <- accTypes.indices) yield j""" - |accList$i = new java.util.ArrayList<${accTypes(i)}>(2); + |accList$i = new java.util.ArrayList<${accTypes(i)}>(); --- End diff -- Why not creating the `ArrayList` with initial capacity 2? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973414#comment-15973414 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111994938 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala --- @@ -64,38 +68,22 @@ class DataSetPreAggFunction( def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = { // create accumulators -var i = 0 -while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 -} +accumulators = function.createAccumulators() val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() --- End diff -- we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973427#comment-15973427 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112003763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala --- @@ -25,58 +25,56 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row -import org.apache.flink.util.Preconditions - +import org.slf4j.LoggerFactory /** * This map function only works for windows on batch tables. * It appends an (aligned) rowtime field to the end of the output row. + * + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param timeFieldPos Time field position in input row + * @param tumbleTimeWindowSize The size of tumble time window */ class DataSetWindowAggMapFunction( -private val aggregates: Array[AggregateFunction[_]], -private val aggFields: Array[Array[Int]], -private val groupingKeys: Array[Int], -private val timeFieldPos: Int, // time field position in input row +private val genAggregations: GeneratedAggregationsFunction, +private val timeFieldPos: Int, private val tumbleTimeWindowSize: Option[Long], @transient private val returnType: TypeInformation[Row]) - extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] { - - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggFields) - Preconditions.checkArgument(aggregates.length == aggFields.length) + extends RichMapFunction[Row, Row] +with ResultTypeQueryable[Row] +with Compiler[GeneratedAggregations] { private var output: Row = _ - // add one more arity to store rowtime - private val partialRowLength = groupingKeys.length + aggregates.length + 1 - // rowtime index in the buffer output row - private val rowtimeIndex: Int = partialRowLength - 1 + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ override def open(config: Configuration) { -output = new Row(partialRowLength) +LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + +s"Code:\n$genAggregations.code") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genAggregations.name, + genAggregations.code) +LOG.debug("Instantiating AggregateHelper.") +function = clazz.newInstance() + +output = function.createOutputRow() } override def map(input: Row): Row = { -var i = 0 -while (i < aggregates.length) { - val agg = aggregates(i) - val fieldValue = input.getField(aggFields(i)(0)) - val accumulator = agg.createAccumulator() - agg.accumulate(accumulator, fieldValue) - output.setField(groupingKeys.length + i, accumulator) - i += 1 -} +function.createAccumulatorsAndSetToOutput(output) --- End diff -- create an accumulator with `function.createAccumulator()` once in `open()`, reset it here, and copy it to `output` with `function.setAggregationResults()`? > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973428#comment-15973428 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala --- @@ -18,111 +18,77 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import java.util.{ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row -import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory /** * It wraps the aggregate logic inside of * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. * * It is used for sliding on batch for both time and count-windows. * - * @param aggregates aggregate functions. - * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping index mapping between aggregate function list and aggregated value - * index in output Row. - * @param finalRowArity output row field count + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceGroupFunction( -aggregates: Array[AggregateFunction[_ <: Any]], -groupKeysMapping: Array[(Int, Int)], -aggregateMapping: Array[(Int, Int)], -finalRowArity: Int, +genAggregations: GeneratedAggregationsFunction, +keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], windowSize: Long) - extends RichGroupReduceFunction[Row, Row] { - - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupKeysMapping) + extends RichGroupReduceFunction[Row, Row] +with Compiler[GeneratedAggregations] { private var collector: TimeWindowPropertyCollector = _ + protected val windowStartPos: Int = keysAndAggregatesArity + private var output: Row = _ - private val accumulatorStartPos: Int = groupKeysMapping.length - protected val windowStartPos: Int = accumulatorStartPos + aggregates.length + protected var accumulators: Row = _ - val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { -new JArrayList[Accumulator](2) - } + val LOG = LoggerFactory.getLogger(this.getClass) + protected var function: GeneratedAggregations = _ override def open(config: Configuration) { -output = new Row(finalRowArity) +LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + +s"Code:\n$genAggregations.code") +val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) +LOG.debug("Instantiating AggregateHelper.") +function = clazz.newInstance() + +output = function.createOutputRow() +accumulators = function.createAccumulators() collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) - -// init lists with two empty accumulators -var i = 0 -while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 -} } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { -// reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 -} +// reset accumulator +function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) {
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973417#comment-15973417 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111993502 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala --- @@ -64,38 +68,22 @@ class DataSetPreAggFunction( def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = { // create accumulators -var i = 0 -while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 -} +accumulators = function.createAccumulators() --- End diff -- create accumulators once and use `function.resetAccumulators()`? > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973422#comment-15973422 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111992686 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala --- @@ -21,44 +21,48 @@ import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} +import org.slf4j.LoggerFactory /** * [[RichGroupReduceFunction]] to compute aggregates that do not support pre-aggregation for batch * (DataSet) queries. * - * @param aggregates The aggregate functions. - * @param aggInFields The positions of the aggregation input fields. + * @param genAggregations Code-generated [[GeneratedAggregations]] * @param gkeyOutMapping The mapping of group keys between input and output positions. - * @param aggOutMapping The mapping of aggregates to output positions. * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. - * @param finalRowArity The arity of the final resulting row. */ class DataSetAggFunction( -private val aggregates: Array[AggregateFunction[_ <: Any]], -private val aggInFields: Array[Array[Int]], -private val aggOutMapping: Array[(Int, Int)], +private val genAggregations: GeneratedAggregationsFunction, private val gkeyOutMapping: Array[(Int, Int)], --- End diff -- It would be good if we could parameterize the method that generates the code such that we can do the grouping keys and grouping set copies with `GeneratedAggregations.setForwardFields()`. This should be possible as it is actually just setting constant boolean flags at certain positions in the output Row. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973420#comment-15973420 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111995303 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala --- @@ -19,88 +19,71 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import java.util.{ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} +import org.slf4j.LoggerFactory /** * [[RichGroupReduceFunction]] to compute the final result of a pre-aggregated aggregation * for batch (DataSet) queries. * - * @param aggregates The aggregate functions. - * @param aggOutFields The positions of the aggregation results in the output + * @param genAggregations Code-generated [[GeneratedAggregations]] * @param gkeyOutFields The positions of the grouping keys in the output * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. - * @param finalRowArity The arity of the final resulting row */ class DataSetFinalAggFunction( -private val aggregates: Array[AggregateFunction[_ <: Any]], -private val aggOutFields: Array[Int], +private val genAggregations: GeneratedAggregationsFunction, private val gkeyOutFields: Array[Int], -private val groupingSetsMapping: Array[(Int, Int)], -private val finalRowArity: Int) - extends RichGroupReduceFunction[Row, Row] { +private val groupingSetsMapping: Array[(Int, Int)]) + extends RichGroupReduceFunction[Row, Row] +with Compiler[GeneratedAggregations] { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggOutFields) Preconditions.checkNotNull(gkeyOutFields) Preconditions.checkNotNull(groupingSetsMapping) private var output: Row = _ + private var accumulators: Row = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ private val intermediateGKeys: Option[Array[Int]] = if (!groupingSetsMapping.isEmpty) { Some(gkeyOutFields) } else { None } - private val numAggs = aggregates.length - private val numGKeys = gkeyOutFields.length - - private val accumulators: Array[JArrayList[Accumulator]] = -Array.fill(numAggs)(new JArrayList[Accumulator](2)) - override def open(config: Configuration) { -output = new Row(finalRowArity) - -// init lists with two empty accumulators -for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulators(i).add(accumulator) - accumulators(i).add(accumulator) -} +LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + +s"Code:\n$genAggregations.code") +val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) +LOG.debug("Instantiating AggregateHelper.") +function = clazz.newInstance() + +output = function.createOutputRow() +accumulators = function.createAccumulators() } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { val iterator = records.iterator() // reset first accumulator -var i = 0 -while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulators(i).get(0)) - i += 1 -} +function.resetAccumulator(accumulators) +var i = 0 while (iterator.hasNext) { val record = iterator.next() --- End diff -- we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >
[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973413#comment-15973413 ] ASF GitHub Bot commented on FLINK-6242: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111990633 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -263,33 +263,56 @@ class CodeGenerator( aggFields: Array[Array[Int]], aggMapping: Array[Int], fwdMapping: Array[(Int, Int)], - outputArity: Int) + outputArity: Int, + groupingKeys: Array[Int]) : GeneratedAggregationsFunction = { def genSetAggregationResults( accTypes: Array[String], aggs: Array[String], aggMapping: Array[Int]): String = { - val sig: String = + val sigHelper: String = j""" -| public void setAggregationResults( -|org.apache.flink.types.Row accs, -|org.apache.flink.types.Row output)""".stripMargin + | private final void setAggregationResultsHelper( + |org.apache.flink.types.Row accs, + |org.apache.flink.types.Row output, + |java.lang.Integer offset)""".stripMargin - val setAggs: String = { + val setAggsHelper: String = { for (i <- aggs.indices) yield j""" |org.apache.flink.table.functions.AggregateFunction baseClass$i = | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; | |output.setField( - | ${aggMapping(i)}, + | ${aggMapping(i)} + offset, --- End diff -- `${aggMapping(i)} + offset` -> `${aggMapping(i) + offset}` to add the constant `offset` to the mapping before generating the code. > codeGen DataSet Goupingwindow Aggregates > > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)