[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200523824 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Statistic +import org.apache.calcite.schema.impl.AbstractTable + +class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]], --- End diff -- Huge +1. My understanding is this will be the overall class to hold a table source, sink or both. `TableSourceSinkTable` seems redundant. ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200522491 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -160,10 +173,34 @@ abstract class BatchTableEnvironment( throw new TableException("Same number of field names and types required.") } -tableSink match { +val configuredSink = tableSink.configure(fieldNames, fieldTypes) +registerTableSinkInternal(name, configuredSink) + } + + def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = { --- End diff -- could probably move this to based class `TableEnvironment` ? ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200524149 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -16,57 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util.{ServiceConfigurationError, ServiceLoader} -import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION -import org.apache.flink.table.descriptors._ +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.MetadataValidator._ +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** - * Service provider interface for finding suitable table source factories for the given properties. + * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] + * and [[org.apache.flink.table.sinks.TableSink]]. */ -object TableSourceFactoryService extends Logging { +class TableConnectorFactoryService[T] extends Logging { --- End diff -- also just `TableFactoryService` ? ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200521017 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); --- End diff -- missing `both` ? ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200524249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * Common class for all descriptors describing a table sink. + */ +abstract class TableSinkDescriptor extends TableDescriptor { + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { --- End diff -- +1 Should be able to unify ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200524110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala --- @@ -16,57 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util.{ServiceConfigurationError, ServiceLoader} -import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION -import org.apache.flink.table.descriptors._ +import org.apache.flink.table.api._ +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.MetadataValidator._ +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.Logging -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** - * Service provider interface for finding suitable table source factories for the given properties. + * Unified interface to create TableConnectors, e.g. [[org.apache.flink.table.sources.TableSource]] + * and [[org.apache.flink.table.sinks.TableSink]]. */ -object TableSourceFactoryService extends Logging { +class TableConnectorFactoryService[T] extends Logging { --- End diff -- +1 ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200523261 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala --- @@ -16,21 +16,18 @@ * limitations under the License. */ -package org.apache.flink.table.sources +package org.apache.flink.table.connector import java.util -/** - * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider - * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that - * describe the desired table source. The factory allows for matching to the given set of - * properties and creating a configured [[TableSource]] accordingly. - * - * Classes that implement this interface need to be added to the - * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in - * the current classpath to be found. - */ -trait TableSourceFactory[T] { +trait TableConnectorFactory[T] { --- End diff -- +1 I think the most baffling point I have read until this point was the `Table*Connector*Factory` part :-) ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r200521222 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -56,23 +58,44 @@ public Environment() { return tables; } + private static TableDescriptor create(String name, Map config) { + if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); + } + final String tableType = (String) config.get(TableDescriptorValidator.TABLE_TYPE()); + if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { + return new Source(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) { + return new Sink(name, ConfigUtil.normalizeYaml(config)); + } else if (tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) { + return new SourceSink(name, ConfigUtil.normalizeYaml(config)); + } + return null; + } + public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) { - throw new SqlClientException("The 'type' attribute of a table is missing."); + if (!config.containsKey(NAME)) { + throw new SqlClientException("The 'name' attribute of a table is missing."); } - if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) { - config.remove(TableDescriptorValidator.TABLE_TYPE()); - final Source s = Source.create(config); - if (this.tables.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); - } - this.tables.put(s.getName(), s); - } else { + final Object name = config.get(NAME); + if (name == null || !(name instanceof String) || ((String) name).length() <= 0) { + throw new SqlClientException("Invalid table name '" + name + "'."); + } + final String tableName = (String) name; + final Map properties = new HashMap<>(config); + properties.remove(NAME); + + TableDescriptor tableDescriptor = create(tableName, properties); + if (null == tableDescriptor) { throw new SqlClientException( - "Invalid table 'type' attribute value, only 'source' is supported"); + "Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); + } + if (this.tables.containsKey(tableName)) { + throw new SqlClientException("Duplicate table name '" + tableName + "'."); --- End diff -- if only `"source"` and `"sink"` is allowed, should we allow the same name but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1", "type": "sink"}` co-exist? this is actually following up with the previous comment. I think we just need one, either should work. ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199352237 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- +1 for this approach that directly specifies the interval literals. Regarding Quarter. It seems like a very old implementation and we should probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it consistent with all other time unit extractions. What do you guys think? I just tried it out by modifying the `Extract` method and it seems working perfectly. ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197622239 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,42 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + override private[flink] def children = unit :: count :: timestamp :: Nil + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { +var timeUnit : Option[TimeUnit] = None +if (unit.isInstanceOf[Literal]) { +var unitValue= unit.asInstanceOf[Literal].value.toString() +val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", "SQL_TSI_WEEK", +"SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") +if (sqlTsiArray.contains(unitValue)) { + unitValue = unitValue.split("_").last +} +timeUnit = Some(TimeUnit.valueOf(unitValue)) +} + +relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode, +relBuilder.call(SqlStdOperatorTable.MULTIPLY, + relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier, --- End diff -- no. what I mean is something like: ``` count match { case literal: Literal => //...generate a new IntervalSqlType based on unit and count case other: _ => //... Use your current approach using SQL SqlStdOperatorTable.MULTIPLY } ``` You can move the logic to validate `unit` as a `Literal` to `validateInput` function like @hequn8128 mentioned. ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197613995 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,42 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + override private[flink] def children = unit :: count :: timestamp :: Nil + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { +var timeUnit : Option[TimeUnit] = None +if (unit.isInstanceOf[Literal]) { +var unitValue= unit.asInstanceOf[Literal].value.toString() +val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", "SQL_TSI_WEEK", --- End diff -- I think defining these in a static private field would be more appropriate. ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197613915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,42 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + override private[flink] def children = unit :: count :: timestamp :: Nil + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { +var timeUnit : Option[TimeUnit] = None +if (unit.isInstanceOf[Literal]) { --- End diff -- I think if we use pattern matching here it will be much cleaner, something like. ``` unit match { case literal: Literal => //... case _ => //... } ``` That also remind me we should probably override `validateInput` function as well. ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197613157 --- Diff: docs/dev/table/tableApi.md --- @@ -2804,6 +2804,17 @@ dateFormat(TIMESTAMP, STRING) + + +{% highlight java %} +timestampAdd(unit, interval, timestamp) +{% endhighlight %} + + +Adds a (signed) integer interval to a timestamp. The unit for the interval is given by the unit argument, which should be one of the following values: "SECOND", "MINUTE", "HOUR", "DAY", "WEEK", "MONTH", "QUARTER", or "YEAR". E.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 2003-01-09. --- End diff -- Let's make it consistent with `sql.md`, I believe there's no `"` for the agument values ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197614566 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,42 @@ case class TemporalOverlaps( } } + /** +* Standard conversion of the TIMESTAMPADD operator. +* Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] +*/ +case class TimestampAdd( +unit: Expression, +count: Expression, +timestamp: Expression) + extends Expression { + + override private[flink] def children = unit :: count :: timestamp :: Nil + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { +var timeUnit : Option[TimeUnit] = None +if (unit.isInstanceOf[Literal]) { +var unitValue= unit.asInstanceOf[Literal].value.toString() +val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", "SQL_TSI_WEEK", +"SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") +if (sqlTsiArray.contains(unitValue)) { + unitValue = unitValue.split("_").last +} +timeUnit = Some(TimeUnit.valueOf(unitValue)) +} + +relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode, +relBuilder.call(SqlStdOperatorTable.MULTIPLY, + relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier, --- End diff -- I think this part can probably be further optimize if `count` is also a Literal, this way the MULTIPLY operator is not necessary and it can directly convert to `IntervalSqlType`. what do you think? ---
[GitHub] flink issue #6049: [FLINK-9398][Client] Fix CLI list running job returns all...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/6049 Thanks for the comment @zentol . I actually thought about it and think we should group same jobs status listings together, there are also some other non-terminating states as well. Regarding the user-facing change to whether use `-t` (terminated) or `-a` (all). I lean towards using `-a`, but internally we named them terminated. Later on we can easily add supports like `finished` `cancelled` etc if necessary. What do you think? ---
[GitHub] flink pull request #6049: [FLINK-9398][Client] Fix CLI list running job retu...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/6049#discussion_r192778560 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception { System.out.println("--"); } } + if (showRemaining) { + if (remainingJobs.size() != 0) { + remainingJobs.sort(startTimeComparator); --- End diff -- Thanks for the reply @zentol , I wasn't sure I fully understand this comment. Are you suggesting that we should always group jobs with same status together during print for all 3 cases? ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r190788353 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,196 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." +JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\"" +JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\"" +JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :" + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function extract_valid_pact_from_job_info_return() { +PACT_MATCH=0 +if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]]; +then +PACT_MATCH=$PACT_MATCH +else +PACT_MATCH=-1 +fi +if [[ $1 =~ $JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR ]]; +then +PACT_MATCH=$PACT_MATCH +else +PACT_MATCH=-1 +fi +echo ${PACT_MATCH} +} + +function extract_valid_job_list_by_type_from_job_list_return() { +JOB_LIST_MATCH=0 +JOB_LIST_REGEX_EXTRACTOR="$JOB_LIST_REGEX_EXTRACTOR_BY_STATUS $2 $3" +if [[ $1 =~ $JOB_LIST_REGEX_EXTRACTOR ]]; +then +JOB_LIST_MATCH=$JOB_LIST_MATCH +else +JOB_LIST_MATCH=-1 +fi +echo ${JOB_LIST_MATCH} +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all --- End diff -- checking the clean up code seems like it doesn't explicitly call stopping all tm (similar to `start-cluster.sh`, I need to explicitly call `taskmanager start`). I should remove only the `stop-cluster` actually. ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r190433925 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- good point. I will add the check ð ---
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190124815 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.setop + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class StreamIntersectCoProcessFunction( + resultType: TypeInformation[Row], + queryConfig: StreamQueryConfig, + all: Boolean) + extends CoProcessFunction[CRow, CRow, CRow] + with Logging { + + validateEqualsHashCode("intersect", resultType) + + // state to hold left stream element + private var leftState: ValueState[JTuple2[Int, Long]] = _ + // state to hold right stream element + private var rightState: ValueState[JTuple2[Int, Long]] = _ + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + private var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + private var rightTimer: ValueState[Long] = _ + + private var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug("Instantiating StreamIntersectCoProcessFunction.") +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]]( + "left", tupleTypeInfo) +val rightStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]]( + "right", tupleTypeInfo) +leftState = getRuntimeContext.getState(leftStateDescriptor) +rightState = getRuntimeContext.getState(rightStateDescriptor) + +// initialize timer state +val valueStateDescriptor1 = new ValueStateDescriptor[Long]("leftTimer", classOf[Long]) +leftTimer = getRuntimeContext.getState(valueStateDescriptor1) +val valueStateDescriptor2 = new ValueStateDescriptor[Long]("rightTimer", classOf[Long]) +rightTimer = getRuntimeContext.getState(valueStateDescriptor2) + +cRowWrapper = new CRowWrappingMultiOutputCollector() +//we emit one record per process at most +cRowWrapper.setTimes(1) + } + + override def processElement1( +value: CRow, +ctx: CoProcessFunction[CRow, CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + +processElement(value, ctx, out, leftState, leftTimer, rightState) + } + + override def processElement2( +value: CRow, +ctx: CoProcessFunction[CRow, CRow, CRow]#Context, +out: Collector[CRow]): Uni
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190124755 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.setop + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class StreamIntersectCoProcessFunction( + resultType: TypeInformation[Row], + queryConfig: StreamQueryConfig, + all: Boolean) + extends CoProcessFunction[CRow, CRow, CRow] + with Logging { + + validateEqualsHashCode("intersect", resultType) + + // state to hold left stream element --- End diff -- The description is misleading, you are not actually holding the "row" of stream element if I understand correctly. ---
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190120842 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.setop + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class StreamIntersectCoProcessFunction( --- End diff -- Missing JavaDoc ---
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190119234 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamIntersect.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowKeySelector +import org.apache.flink.table.runtime.setop.StreamIntersectCoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConverters._ + +class DataStreamIntersect( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + rowRelDataType: RelDataType, + all: Boolean) + extends BiRel(cluster, traitSet, leftNode, rightNode) +with DataStreamRel { + + private lazy val intersectType = if (all) { +"All" + } else { +"" + } + + override def needsUpdatesAsRetraction: Boolean = true + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamIntersect( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + getRowType, + all +) + } + + override def toString: String = { +s"Intersect$intersectType(intersect$intersectType: ($intersectSelectionToString))" --- End diff -- `s"Intersect$intersectType(intersect: ($intersectSelectionToString))"` I dont think you need to duplicate the type twice ---
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190123516 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.setop + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class StreamIntersectCoProcessFunction( + resultType: TypeInformation[Row], + queryConfig: StreamQueryConfig, + all: Boolean) + extends CoProcessFunction[CRow, CRow, CRow] + with Logging { + + validateEqualsHashCode("intersect", resultType) + + // state to hold left stream element + private var leftState: ValueState[JTuple2[Int, Long]] = _ + // state to hold right stream element + private var rightState: ValueState[JTuple2[Int, Long]] = _ + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + private var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + private var rightTimer: ValueState[Long] = _ + + private var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug("Instantiating StreamIntersectCoProcessFunction.") +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]]( + "left", tupleTypeInfo) +val rightStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]]( + "right", tupleTypeInfo) +leftState = getRuntimeContext.getState(leftStateDescriptor) +rightState = getRuntimeContext.getState(rightStateDescriptor) + +// initialize timer state +val valueStateDescriptor1 = new ValueStateDescriptor[Long]("leftTimer", classOf[Long]) +leftTimer = getRuntimeContext.getState(valueStateDescriptor1) +val valueStateDescriptor2 = new ValueStateDescriptor[Long]("rightTimer", classOf[Long]) +rightTimer = getRuntimeContext.getState(valueStateDescriptor2) + +cRowWrapper = new CRowWrappingMultiOutputCollector() +//we emit one record per process at most +cRowWrapper.setTimes(1) + } + + override def processElement1( +value: CRow, +ctx: CoProcessFunction[CRow, CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + +processElement(value, ctx, out, leftState, leftTimer, rightState) + } + + override def processElement2( +value: CRow, +ctx: CoProcessFunction[CRow, CRow, CRow]#Context, +out: Collector[CRow]): Uni
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190122671 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.setop + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class StreamIntersectCoProcessFunction( + resultType: TypeInformation[Row], + queryConfig: StreamQueryConfig, + all: Boolean) + extends CoProcessFunction[CRow, CRow, CRow] --- End diff -- I guess I am confused here: There's `CoGroupedStream` with customized `CoGroupFunction` which is already supported in DataStream API. seems like if we operate on a windowed stream, we can apply the `intersect` as a `CoGroupFunction`. Is this function solely targeting the non-windowed intersect case. If so, can we rename the function (also adds to my point: please add Java Doc). ---
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190119548 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamIntersect.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowKeySelector +import org.apache.flink.table.runtime.setop.StreamIntersectCoProcessFunction +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConverters._ + +class DataStreamIntersect( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftNode: RelNode, + rightNode: RelNode, + rowRelDataType: RelDataType, + all: Boolean) + extends BiRel(cluster, traitSet, leftNode, rightNode) +with DataStreamRel { + + private lazy val intersectType = if (all) { +"All" --- End diff -- `" All"` might be better formatting since you only attached this to the `explainTerm` and `toString` method ---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5849 Thanks @tzulitai for the suggestion. I will close this and continue with the new PR in #6054 . ---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test ...
Github user walterddr closed the pull request at: https://github.com/apache/flink/pull/5849 ---
[GitHub] flink pull request #6054: [FLINK-8986][e2e-test] Flink end to end test REST ...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/6054 [FLINK-8986][e2e-test] Flink end to end test REST API ## What is the purpose of the change Adding end to end test for REST APIs for FLIP-6 generated endpoints. *This is a follow up PR based on #5849 and the changes & improvements are based on @zentol 's comment and suggestions.* *This PR depends on one commit in #5863 (ea12737) * ## Brief change log - Adding in `flink-rest-api-test` module and `test_rest_api.sh` for end-to-end testing - Adding in payload YAML file for endpoints used by REST APIs if any POST/PATCH endpoint requires a payload. - Adding a validation sequence before testing REST API modules in order to extract all path / query parameters for REST API endpoints. ## Verifying this change This test is run successfully against all REST API in `DispatcherRestEndpoint`. This test will automatically pickup new endpoints for testing as long as support is added to `DispatcherRestEndpoint`. If new endpoint requires additional path / payload / query parameter, test will be skipped instead of failing for the new endpoints. ## Does this pull request potentially affect one of the following parts: No ## Documentation No documentations for this test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-8986-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6054.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6054 commit ea127374af896d9dc82c1a9911f65b2360705397 Author: Rong Rong <rongr@...> Date: 2018-04-17T20:45:23Z initial commit to support CLI test, excluding YARN test commit 119895ffdfb783985892fafdde5fc4e0646a260a Author: Rong Rong <walter_ddr@...> Date: 2018-04-25T18:51:12Z adding in rest api tests, it went through 34 and skipped 6 which requires more data. Following completion of this test should add payload to requestBodies.yaml for testing commit 7dc064a23854afe95737afb8602bc25a2f3aca3d Author: Rong Rong <rongr@...> Date: 2018-04-27T00:27:50Z adding in post test with payload commit 977abed8b63ab91e7b8a51df62df28d1ea0ec284 Author: Rong Rong <walter_ddr@...> Date: 2018-04-27T05:05:11Z adding in all testing request payloads and all 43 tests are passing ---
[GitHub] flink issue #6049: [FLINK-9398][Client] Fix CLI list running job returns all...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/6049 @zentol yeah, I totally agree with the naming issue and just updated with a more clear `ListOptions` definitions. Please take another look. Thanks ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5863#discussion_r189673647 --- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# Test for CLI commands. +# verify only the return code the content correctness of the API results. +PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar +JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)" +SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\." + +EXIT_CODE=0 + +function extract_job_id_from_job_submission_return() { +if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]]; +then +JOB_ID="${BASH_REMATCH[1]}"; +else +JOB_ID="" +fi +echo "$JOB_ID" +} + +function extract_savepoint_path_from_savepoint_return() { +if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]]; +then +SAVEPOINT_PATH="${BASH_REMATCH[1]}"; +else +SAVEPOINT_PATH="" +fi +echo "$SAVEPOINT_PATH" +} + +function cleanup_cli_test() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + cleanup +} + +printf "\n==\n" +printf "Test default job launch with non-detach mode\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar" +EXIT_CODE=$? +fi + +printf "\n==\n" +printf "Test run with complex parameter set\n" +printf "==\n" +if [ $EXIT_CODE == 0 ]; then +eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \ --- End diff -- Changed to use `-p 1`. I think the part that "parallelism should be taken by the CLI command" is verified by the exit code from the `run` execution. is that correct? ---
[GitHub] flink pull request #6049: [FLINK-9398][Client] Fix CLI list running job retu...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/6049 [FLINK-9398][Client] Fix CLI list running job returns all except scheduled jobs ## What is the purpose of the change This PR fixes CLI command `bin/flink list -r` returning all except scheduled jobs. ## Brief change log Change the behavior of `bin/flink list` to - Adding in a `-a` option to list all jobs. Including `CREATED`, `RUNNING` & `RESTARTING` - Fixing `-r` option to list only `RUNNING` and `RESTARTING`. - Internally map `-a` with `other` job type so it won't affect the behavior of explicit listing options when adding other options to `list` command. ## Verifying this change This change added tests and can be verified as follows: - Added unit-test to verify `-a` options. - Added `ListOptions` unit-test suite which never existed before. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? documented in `cli.md` You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-9398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6049.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6049 commit d742f3ec5ffd4b411162db234e108a496a4f5aaa Author: Rong Rong <walter_ddr@...> Date: 2018-05-20T14:29:56Z fix CLI list options showing all jobs instead of just running / restarting jobs commit e6d827700427a3f31a699ff02cb5b9906715c629 Author: Rong Rong <walter_ddr@...> Date: 2018-05-20T23:45:18Z adding in documentation for list all jobs option ---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5849 Hi @tzulitai . I've actually created a new version of the test based on @zentol 's comment on this PR: https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test But it actually depends on https://github.com/apache/flink/pull/5863 as I reused the periodic stream job for testing. Is it possible to create a PR on top of another currently pending PR? ---
[GitHub] flink issue #5863: [FLINK-8985][e2etest] initial support for End-to-end CLI ...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5863 Thanks @tzulitai for the review. I will update asap. I am not 100% sure whether I should verify the CLI return but I would definitely add them. ---
[GitHub] flink issue #6007: [FLINK-8518][Table API & SQL] Support DOW, EPOCH, DECADE ...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/6007 Wasn't sure this is the full PR or just partial implementation, but I don't see `EPOCH` or `DECADE` in main or test though. ---
[GitHub] flink pull request #5985: [FLINK-8135][docs] Add description to MessageParam...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5985#discussion_r187459062 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java --- @@ -38,4 +38,10 @@ public String convertValueToString(Boolean value) { public Boolean convertStringToValue(String value) { return Boolean.valueOf(value); } + + @Override + public String getDescription() { + return "Boolean value. If true then serialized user task accumulators will be included into a response. " + + "False by default."; --- End diff -- "False by default." is confusing to me, is "false": the default value of the parameter is `false`, or "false": whether the task accumulator will be included or not depends on a "default" configuration from the task accumulator? ---
[GitHub] flink pull request #5985: [FLINK-8135][docs] Add description to MessageParam...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5985#discussion_r187460147 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java --- @@ -44,4 +44,9 @@ protected String convertToString(final Integer value) { return value.toString(); } + @Override + public String getDescription() { + return "The index of a subtask. Integer value starts from 0, should be positive."; --- End diff -- `must be non-negative`, if "0" is valid ---
[GitHub] flink pull request #5985: [FLINK-8135][docs] Add description to MessageParam...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5985#discussion_r187458764 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SavepointPathQueryParameter.java --- @@ -28,4 +28,9 @@ public SavepointPathQueryParameter() { super(KEY, MessageParameterRequisiteness.OPTIONAL); } + + @Override + public String getDescription() { + return "Defines a path to savepoint to restore from."; --- End diff -- " path to savepoint" -> "path of the savepoint" ? ---
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5940 LOL. I think I found a way: 1. Rebase #3764 over to current master; 2. Rebase this branch to the rebased #3764; 3. Make changes on top :-) ---
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5940 Thanks @suez1224 @fhueske for the comments. I will change them accordingly. Yes I copied a lot of test cases from @haohui's PR for my own testing. I can definitely put it on top given the runtime support is already merged in #. Procedure-wise question: should I rebase his commit then add my change on top, then attached to this PR? I am not sure if there's a clever way to both preserve the discussion in this thread and rebase on top of his change. ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184861354 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,539 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with which +
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184861107 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,539 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name --- End diff -- seems like the `result.mode` can only be set through the environment config file. executing it in CLI doesn't take effect to me. ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184860963 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,539 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. --- End diff -- can we replace "following sections" with the actual link, docs might be updated later and the "following" statement might no longer be true ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184860924 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. --- End diff -- also `enviroment-files` is technically 4 parts away instead of the next part :-P ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184860868 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + --- End diff -- correct me if i were wrong, seems like @twalthr is referring to the embedded mode of the SQL-client. It might be useful to have another section "SQL Client Mode" -> "Embedded" / "Gateway" preceding this section, and put "Gateway" mode linked to "Limitation & Future" section ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184860810 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,539 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: --- End diff -- `see the [Cluster & Deployment](...) section.` ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support DistinctAgg on DataStre...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/5940 [FLINK-8690][table]Support DistinctAgg on DataStream ## What is the purpose of the change * Allow FlinkLogicalAggregate to support distinct aggregations on DataStream, while keeping DataSet to decompose distinct aggs into GROUP BY follow by normal aggregates. ## Brief change log - Moved `AggregateExpandDistinctAggregatesRule.JOIN` to `DATASET_NORM_RULES` - Enabled `DataStreamGroupWindowAggregate` to support distinct agg while maintaining unsupported for `[DataStream/DataSet]GroupAggregate`. - Fixed typo in codegen for distinct aggregate when merge - Fixed a possible codegen test error for `UNION ALL`. ## Verifying this change - Unit-test are added for `DistinctAggregateTest` - Added ITCase for distinct group window agg ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes (codegen) - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not yet, should we put in `Aggregate` section or `Group Window` section? inputs are highly appreciated. Also distinct over aggregate is bug-fixed in FLINK-8689 but not documented. You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-8690 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5940.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5940 commit c517821d13341ae10b5d47acdbd0cc7d5bbe38b7 Author: Rong Rong <rongr@...> Date: 2018-04-28T15:59:12Z moving AggregateExpandDistinctAggregatesRule.JOIN to DATASET_NORM_RULES, and enabled distinct aggregate support for window aggregate over datastream ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184441869 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLon override def equals(that: Any): Boolean = that match { case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && -this.mapView == that.mapView +this.distinctValueMap == that.distinctValueMap case _ => false } def add(element: E): Boolean = { -if (element != null) { - val currentVal = mapView.get(element) - if (currentVal != null) { -mapView.put(element, currentVal + 1L) -false - } else { -mapView.put(element, 1L) -true - } -} else { +val wrappedElement = Row.of(element) --- End diff -- Thanks @fhueske for the insight. yeah I thought about that before the last commit but didn't go through with it, since we still need to construct the row of single element before passing to the distinct accumulator. But you are right, it will make future optimization easier ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184429742 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L,
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184428065 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L,
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184115873 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + +val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { +val fieldIndex: Int = aggFields(idx)(0) +val mapViewTypeInfo = new MapViewTypeInfo( + physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO) --- End diff -- at this moment it will disregard any null value. But as you mention this is not correct. Will address. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184125030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Long => JLong} +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} + +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLong]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, JLong]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, JLong]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { --- End diff -- Great catch @fhueske. Sorry I misunderstood what you meant previous regrading `null` value. This line was to disregard all `null` values. But as you mentioned, some operators might treat `null` value differently, e.g. `COUNT` aggregators. I added the handling and the tests as well. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184115947 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,36 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + +val distinctAggs: Array[Seq[DataViewSpec[_]]] = isDistinctAggs.zipWithIndex.map { + case (isDistinctAgg, idx) => if (isDistinctAgg) { +val fieldIndex: Int = aggFields(idx)(0) +val mapViewTypeInfo = new MapViewTypeInfo( + physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO) +Seq( + MapViewSpec( +"distinctAgg" + idx + "_field" + fieldIndex, --- End diff -- +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183880939 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- Yes. You are right. Sorry I missed that. Just updated :-) ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222089 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) + + distinctAggs(index) = Seq( --- End diff -- `mapViewTypeInfo` doesn't seem to be available in codegen. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} + |${accTypes(i)} acc$i = (${accTypes(i)}) distinctAcc$i.getRealAcc(); + |${genAccDataViewFieldSetter(s"acc$i", i)} + |output.setField( + | ${aggMapping(i)}, + | baseClass$i.getValue(acc$i));""".stripMargin +} else { --- End diff -- yeah, that's true. it will avoid a lot of typo possibility as well. +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222457 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala --- @@ -70,7 +70,12 @@ trait OverAggregate { val aggStrings = namedAggregates.map(_.getKey).map( a => s"${a.getAggregation}(${ -if (a.getArgList.size() > 0) { +val prefix = if (a.isDistinct) { --- End diff -- will address in FLINK-8690 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222451 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, Integer]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, Integer]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, Integer]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { + if (mapView.contains(element)) { +mapView.put(element, mapView.get(element) + 1) +false + } else { +mapView.put(element, 1) +true + } +} else { + false +} + } + + def add(element: E, count: Int): Boolean = { +if (element != null) { + if (mapView.contains(element)) { --- End diff -- +1 here as well, just in case I forgot ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222077 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- That's a good point. I will add test for this use case. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r18367 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- Hmm I tried to generate MapViewSpec in codegen but `AggregationCodeGenerator.generateAggregations` call signature seems to miss the typeinfo for accumulator (`accTypes` only shows the accumulator type, not the `PojoField` info). I can certainly changed to a more simple: `DistinctAccumulator[Object, Long]` but I think there's pros and cons, also making a `MapTypeInfo` for plain `Object` seems funny to me. How about we leave it to the future optimization. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183598006 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { --- End diff -- This test case was added to cover all over aggregate usage on distinct accumulator. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r183222573 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +51,96 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { --- End diff -- Both case went through: `AggregateExpandDistinctAggregatesRule`, thus produce plans that breaks the distinct aggregate into a `GROUP BY` over distinct key, and aggregate without distinct. In the 2nd case, `LogicalWindowAggregateRule` did not applied due to `distinct` keyword. They should both be fixed in FLINK-8690 ---
[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ Thanks @fhueske, I had the exact same feeling. Just attaching `MapState` towards the back of the `Row` might be a current working solution for now, but will probably be nasty to maintain in the future. I am planning to go ahead and create an optimization ticket further down once I had FLINK-8690 completed. What do you think? ---
[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ Hi @fhueske . Thanks for the review, all very good points. I will follow up with the next steps. Actually @hequn8128 and I had some discussions regarding the follow up in FLINK-8690 already and I created 2 different approaches. Please kindly take a look when you have time :-) Best, Rong ---
[GitHub] flink issue #5867: [FLINK-8686] [sql-client] Improve basic embedded SQL clie...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5867 Hi @twalthr, having JVM heap size configurable is definitely a great benefit. Just to clarify, this is only changing the Client JVM heap size, correct? I am assuming this is mainly for manipulating large size data when retrieving results from the query. I think adding it to `flink-conf.yaml` should be fine. There are already specific configurations commented out for components like Kerberos, ZK, etc. which makes adding a specific conf to sql-client less disturbing. We can always break it out to another file later when extra configuration files are needed for sql-client in the future. What do you guys think? ---
[GitHub] flink issue #5865: [FLINK-9199][REST][hot-fix] handler and paramter typos
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5865 thx @zentol , sorry I had to skip checkstyle and test to speed up the build. just fixed ---
[GitHub] flink pull request #5865: [FLINK-9199][REST][hot-fix] handler and paramter t...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/5865 [FLINK-9199][REST][hot-fix] handler and paramter typos [hot-fix] fix message header / handler and parameter typos. You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-9199 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5865.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5865 commit 9a655045154ba123aad7d071951b1a252e7a5304 Author: Rong Rong <rongr@...> Date: 2018-04-18T06:44:02Z fix handler and paramter typos ---
[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/5863 [FLINK-8985][e2etest] initial support for End-to-end CLI test, excluding YARN test ## What is the purpose of the change Adding end to end test for CLI APIs. ## Brief change log Added test_cli_api.sh script to test combinations of CLI commands listed in the doc section of Flink. Including: - Start up command sets (run) - Operational command sets (list/info/cancel) - Savepoint command sets (savepoint) ## Verifying this change This is a test ## Does this pull request potentially affect one of the following parts: No ## Documentation No You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-8985 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5863.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5863 commit 5f36ee5d4dcbb60a29a413fd29cfaaa69f7e8a47 Author: Rong Rong <rongr@...> Date: 2018-04-17T20:45:23Z initial commit to support CLI test, excluding YARN test ---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test] Flink end to end test REST API
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5849 At this moment it skips 8 of the 39 tests (PATCH & POST methods). And there's 2 test failures and I think should be fix in another JIRA. Namely, `/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/accumulators` returns a 500 `/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/` returns a 500 ---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/5849 [FLINK-8986][e2e-test] Flink end to end test REST API ## What is the purpose of the change Adding end to end test for REST APIs for FLIP-6 generated endpoints ## Brief change log - Added `flink-rest-api-test` module in `flink-end-to-end-test` with a periodic stream job and a test suite that runs the REST API tests. - Added test script to run the REST API test. ## Verifying this change N/A, this is a test ## Does this pull request potentially affect one of the following parts: No ## Documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-8986 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5849.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5849 commit 526e5ac1f9758451417972143d7117095b18f2ab Author: Rong Rong <walter_ddr@...> Date: 2018-04-15T15:05:08Z Flink end to end test REST API with generated endpoints only ---
[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5758#discussion_r179178156 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.plan.stats.TableStats + +import scala.collection.JavaConverters._ + +/** + * Common class for all descriptors describing table sources and sinks. + */ +abstract class TableDescriptor extends Descriptor { + + protected var connectorDescriptor: Option[ConnectorDescriptor] = None + protected var formatDescriptor: Option[FormatDescriptor] = None + protected var schemaDescriptor: Option[Schema] = None + protected var statisticsDescriptor: Option[Statistics] = None --- End diff -- I was wondering if a sink could have its own "schema configurations" for alignment with the output table schema? For example a CassandraTableSink / JDBCTableSink would definitely throw exceptions when trying to execute an insert with mismatched schemas. ---
[GitHub] flink pull request #5797: [FLINK-9104][doc]Re-generate REST API documentatio...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5797#discussion_r178913403 --- Diff: flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java --- @@ -258,6 +265,37 @@ private static String createMessageHtmlEntry(Class messageClass, Class emp return json; } + /** +* Create character escapes for HTML when generating JSON request/response string. +*/ + private static class HTMLCharacterEscapes extends CharacterEscapes { --- End diff -- good point. added in comments to illustrate the necessity. ---
[GitHub] flink pull request #5797: [Flink 9104][doc]Re-generate REST API documentatio...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/5797 [Flink 9104][doc]Re-generate REST API documentation for FLIP-6 ## What is the purpose of the change Fix REST-API doc generator and regenerate rest_dispatcher.html ## Brief change log - Changes according to FLINK-8843 - Escape HTML characters ## Verifying this change N/A ## Does this pull request potentially affect one of the following parts: no ## Documentation docs updated You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-9104 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5797.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5797 commit a5ad88f7fa8200cea5084b6d630592bc473a9b6a Author: Rong Rong <walter_ddr@...> Date: 2018-03-31T19:29:59Z fix REST API doc generator bug and regenerate rest_dispatcher ---
[GitHub] flink issue #5796: [Flink 9104][doc]Re-generate REST API documentation for F...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5796 Wrong base ---
[GitHub] flink pull request #5796: [Flink 9104][doc]Re-generate REST API documentatio...
Github user walterddr closed the pull request at: https://github.com/apache/flink/pull/5796 ---
[GitHub] flink pull request #5796: [Flink 9104][doc]Re-generate REST API documentatio...
GitHub user walterddr opened a pull request: https://github.com/apache/flink/pull/5796 [Flink 9104][doc]Re-generate REST API documentation for FLIP-6 ## What is the purpose of the change Fix REST-API doc generator and regenerate rest_dispatcher.html ## Brief change log - Changes according to FLINK-8843 - Escape HTML characters ## Verifying this change N/A ## Does this pull request potentially affect one of the following parts: no ## Documentation docs updated You can merge this pull request into a Git repository by running: $ git pull https://github.com/walterddr/flink FLINK-9104 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5796.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5796 commit 8eb6a30798c09d171e3eb8019b53e677252bd5ba Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-23T10:28:20Z [FLINK-8694][runtime] Fix notifyDataAvailable race condition Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls. This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test for flushAlways (without this fix this test is dead locking). (cherry picked from commit ebd39f3) commit 61a34a691e7d5233f18ac72a1ab8fb09b53c4753 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-26T15:13:06Z [FLINK-8805][runtime] Optimize EvenSerializer.isEvent method For example, previously if the method was used to check for EndOfPartitionEvent and the Buffer contained huge custom event, the even had to be deserialized before performing the actual check. Now we are quickly entering the correct if/else branch and doing full costly deserialization only if we have to. Other calls to isEvent() then checking against EndOfPartitionEvent were not used. (cherry picked from commit 767027f) commit d5338c4154e5de029b3b30e3ef0a0732bf7f68e7 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-02-27T09:39:00Z [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent Because of race condition between: 1. releasing inputChannelsWithData lock in this method and reaching this place 2. empty data notification that re-enqueues a channel we can end up with moreAvailable flag set to true, while we expect no more data. This commit detects such situation, makes a correct assertion and turn off moreAvailable flag. (cherry picked from commit b9b7416) commit 32384ed9b00cf0e1961d355dc4080f25a2156e58 Author: Zhijiang <wangzhijiang999@...> Date: 2018-02-22T14:41:38Z [FLINK-8747][bugfix] The tag of waiting for floating buffers in RemoteInputChannel should be updated properly (cherry picked from commit 6e9e0dd) commit f1453276095c55264f7b4097d16e2987a44b3f33 Author: Zhijiang <wangzhijiang999@...> Date: 2018-02-23T01:55:57Z [hotfix] Fix package private and comments (cherry picked from commit 6165b3d) commit 18ff2ce15bdb1e7bd246e438e47527a24559c86d Author: Nico Kruber <nico@...> Date: 2018-02-26T16:50:10Z [hotfix][network] minor improvements in UnionInputGate (cherry picked from commit 4203557) commit 9265666517830350a4a7037029e347f33df1bea2 Author: Nico Kruber <nico@...> Date: 2018-02-26T16:52:37Z [FLINK-8737][network] disallow creating a union of UnionInputGate instances Recently, the pollNextBufferOrEvent() was added but not implemented but this is used in getNextBufferOrEvent() and thus any UnionInputGate containing a UnionInputGate would have failed already. There should be no use case for wiring up inputs this way. Therefore, fail early when trying to construct this. (cherry picked from commit e8de538) commit 26c8f6c2a3ff75ffb954c816a57908318a2d8099 Author: Stephan Ewen <sewen@...> Date: 2018-02-28T11:15:30Z [hotfix] [tests] Fix SelfConnectionITCase The test previously did not fail on failed execution, and thus evaluated incomplete results from a failed execution with th expected results. This cleans up serialization warnings and uses lambdas where possible, to make the code more readable. commit f60e46dafa8950d5e40cd8a3286c172ecaea6b73 Author: gyao <gary@...> Date: 2018-02-28T12:04:19Z [hotfix] Add missing space to log message in ZooKeeperLeaderElectionService commit b7247929d0745b3b83306d0c93d97faf4ece4107 Author: gyao <gary@...> Date: 2018-02-28T12:06:00Z [hotfix][Javadoc] Fix typo in YARN Utils: teh -> the commit adb3750226971f7c67a0d3069103b56e4fee1c27 Author: gyao <gary@...> Date: 2018-02-28T12:07:04
[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5758#discussion_r178334028 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -29,38 +30,47 @@ /** * Environment configuration that represents the content of an environment file. Environment files - * define sources, execution, and deployment behavior. An environment might be defined by default or + * define tables, execution, and deployment behavior. An environment might be defined by default or * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command). * * In future versions, we might restrict the merging or enrichment of deployment properties to not * allow overwriting of a deployment by a session. */ public class Environment { - private Map<String, Source> sources; + private Map<String, TableDescriptor> tables; private Execution execution; private Deployment deployment; public Environment() { - this.sources = Collections.emptyMap(); + this.tables = Collections.emptyMap(); this.execution = new Execution(); this.deployment = new Deployment(); } - public Map<String, Source> getSources() { - return sources; + public Map<String, TableDescriptor> getTables() { + return tables; } - public void setSources(List<Map<String, Object>> sources) { - this.sources = new HashMap<>(sources.size()); - sources.forEach(config -> { - final Source s = Source.create(config); - if (this.sources.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); + public void setTables(List<Map<String, Object>> tables) { + this.tables = new HashMap<>(tables.size()); + tables.forEach(config -> { + if (!config.containsKey(TableDescriptor.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); --- End diff -- Got it, so `both` should probably be added here once `sink` type is supported in FLINK-8866. Thanks for the clarification. ---
[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5758#discussion_r178334057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.plan.stats.TableStats + +import scala.collection.JavaConverters._ + +/** + * Common class for all descriptors describing table sources and sinks. + */ +abstract class TableDescriptor extends Descriptor { + + protected var connectorDescriptor: Option[ConnectorDescriptor] = None + protected var formatDescriptor: Option[FormatDescriptor] = None + protected var schemaDescriptor: Option[Schema] = None + protected var statisticsDescriptor: Option[Statistics] = None + protected var metaDescriptor: Option[Metadata] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.foreach(_.addProperties(properties)) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) +metaDescriptor.foreach(_.addProperties(properties)) + } + + /** +* Reads table statistics from the descriptors properties. +*/ + protected def getTableStats: Option[TableStats] = { +val normalizedProps = new DescriptorProperties() +addProperties(normalizedProps) +val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT)) +rowCount match { + case Some(cnt) => +val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS) +Some(TableStats(cnt, columnStats.asJava)) + case None => +None +} + } +} + +object TableDescriptor { + /** +* Key for describing the type of this table, valid values are ('source'). +*/ + val TABLE_TYPE = "type" + + /** +* Valid TABLE_TYPE value. +*/ + val TABLE_TYPE_SOURCE = "source" --- End diff -- make sense ð ---
[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5758#discussion_r177830071 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -102,10 +112,10 @@ public static Environment parse(String content) throws IOException { public static Environment merge(Environment env1, Environment env2) { final Environment mergedEnv = new Environment(); - // merge sources - final Map<String, Source> sources = new HashMap<>(env1.getSources()); - mergedEnv.getSources().putAll(env2.getSources()); - mergedEnv.sources = sources; + // merge tables + final Map<String, TableDescriptor> sources = new HashMap<>(env1.getTables()); --- End diff -- `final Map<String, TableDescriptor> tables` ---
[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5758#discussion_r177828794 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -29,38 +30,47 @@ /** * Environment configuration that represents the content of an environment file. Environment files - * define sources, execution, and deployment behavior. An environment might be defined by default or + * define tables, execution, and deployment behavior. An environment might be defined by default or * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command). * * In future versions, we might restrict the merging or enrichment of deployment properties to not * allow overwriting of a deployment by a session. */ public class Environment { - private Map<String, Source> sources; + private Map<String, TableDescriptor> tables; private Execution execution; private Deployment deployment; public Environment() { - this.sources = Collections.emptyMap(); + this.tables = Collections.emptyMap(); this.execution = new Execution(); this.deployment = new Deployment(); } - public Map<String, Source> getSources() { - return sources; + public Map<String, TableDescriptor> getTables() { + return tables; } - public void setSources(List<Map<String, Object>> sources) { - this.sources = new HashMap<>(sources.size()); - sources.forEach(config -> { - final Source s = Source.create(config); - if (this.sources.containsKey(s.getName())) { - throw new SqlClientException("Duplicate source name '" + s + "'."); + public void setTables(List<Map<String, Object>> tables) { + this.tables = new HashMap<>(tables.size()); + tables.forEach(config -> { + if (!config.containsKey(TableDescriptor.TABLE_TYPE())) { + throw new SqlClientException("The 'type' attribute of a table is missing."); --- End diff -- Just curious, any chance we need to have a Table to be both `source` and `sink` type. (e.g. similar to queryable state, if user wants to maintain explicitly a table for that purpose) ---
[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5758#discussion_r177836030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.plan.stats.TableStats + +import scala.collection.JavaConverters._ + +/** + * Common class for all descriptors describing table sources and sinks. + */ +abstract class TableDescriptor extends Descriptor { + + protected var connectorDescriptor: Option[ConnectorDescriptor] = None + protected var formatDescriptor: Option[FormatDescriptor] = None + protected var schemaDescriptor: Option[Schema] = None + protected var statisticsDescriptor: Option[Statistics] = None + protected var metaDescriptor: Option[Metadata] = None + + /** +* Internal method for properties conversion. +*/ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { +connectorDescriptor.foreach(_.addProperties(properties)) +formatDescriptor.foreach(_.addProperties(properties)) +schemaDescriptor.foreach(_.addProperties(properties)) +metaDescriptor.foreach(_.addProperties(properties)) + } + + /** +* Reads table statistics from the descriptors properties. +*/ + protected def getTableStats: Option[TableStats] = { +val normalizedProps = new DescriptorProperties() +addProperties(normalizedProps) +val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT)) +rowCount match { + case Some(cnt) => +val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS) +Some(TableStats(cnt, columnStats.asJava)) + case None => +None +} + } +} + +object TableDescriptor { + /** +* Key for describing the type of this table, valid values are ('source'). +*/ + val TABLE_TYPE = "type" + + /** +* Valid TABLE_TYPE value. +*/ + val TABLE_TYPE_SOURCE = "source" --- End diff -- Maybe we can use either a ENUM or hierarchy to represent "valid" types? Does this seems like a good idea, such as: `TABLE_TYPE.SOURCE_TYPE` ---
[GitHub] flink issue #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMachineEx...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5750 Based on the umbrella task link FLINK-8970, it seems like this e2e test should be attached to FLINK-8973 instead? ---
[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5638 Thanks for the explanation @buptljy. Yeah if that's the case the only way to validate is via ITCase, it might be an overkill in this situation though. ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175825522 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + // we support AVG + case SqlKind.AVG => true + // but none of the other AVG agg functions + case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false + case _ => true +} + +!agg.containsDistinctCall() && supported --- End diff -- Yes. This was kinda confusing to me, we should clean this up when adding DISTINCT support. Thanks for the update @fhueske ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175317673 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { +new QueryableTableSink(this.namePrefix, this.queryConfig) + } +} + +class RowKeySelector( + private val keyIndices: Array[Int], + @transient private val returnType: TypeInformation[Row]) + extends KeySelector[JTuple2[JBool, Row], Row] +with ResultTypeQueryable[Row] { + + override def getKey(value: JTuple2[JBool, Row]): Row = { +val keys = keyIndices + +val srcRow = value.f1 + +val destRow = new Row(keys.length) +var i = 0 +while (i < keys.length) { + destRow.setField(i, srcRow.getField(keys(i))) + i += 1 +} + +destRow + } + + override def getProducedType: TypeInformation[Row] = returnType +} + +class QueryableStateProcessFunction( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig, + private val keyNames: Array[String], + private val fieldNames: Array[String], + private val fieldTypes: Array[TypeInformation[_]]) + extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], Void](queryConfig) { + + @transient private var states = Array[ValueState[AnyRef]]() + @transient private var nonKeyIndices = Array[Int]() + + override def open(paramete
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175315979 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( --- End diff -- Besides @xccui 's suggestion in adding formal docs. Could you also add some java doc style comment here explaining what this table sink does and how this sink is intended to be used? ---
[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5638#discussion_r175296949 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1130,4 +1130,13 @@ object concat_ws { } } +object log { + def apply(base: Expression, antilogarithm: Expression): Expression = { +Log(base, antilogarithm) + } + def apply(antilogarithm: Expression): Expression = { +new Log(antilogarithm) --- End diff -- yup. I thought so too. Thanks for exploring all the possibilities ð ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175296744 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + // we support AVG + case SqlKind.AVG => true + // but none of the other AVG agg functions + case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false + case _ => true +} + +!agg.containsDistinctCall() && supported --- End diff -- shouldn't the logical rule supports distinct call here? It seems like previously the error were thrown on the `DataSetWindowAggregateRule` and `DataStreamWindowAggregateRule` respectively. Any chance we can add a unit-test to further clarify this change? ---
[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ Thanks @hequn8128 for the prompt review. Are you suggesting we created the mapView parallel with the accumulator? The reason why I kept `DistinctAccumulator` is to act as a delegate to enclose the actual accumulator so that it can be passed around in the `accumulatorState` field without extending the arity. I guess if we separate the mapView with the accumulator. I guess I can separately create another field in the `accumulatorState` `Row` to store the `mapView`(s)... This way it might be easier to handle the "reuse same mapView for multiple different distinct agg function" case as we discussed in the doc. Another question is I was trying to reuse as much utility of dataview codegen as possible, as most of them are tightly coupled with the accumulators. I guess I can further refactor (which I already did quite a bit already). Please let me know if that's what you had in mind @hequn8128 -- Rong ---
[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5638#discussion_r173607312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1130,4 +1130,13 @@ object concat_ws { } } +object log { + def apply(base: Expression, antilogarithm: Expression): Expression = { +Log(base, antilogarithm) + } + def apply(antilogarithm: Expression): Expression = { +new Log(antilogarithm) --- End diff -- nvm, I think it might be too weird to write something like `base.log(antilogarithm)` in table API. ---
[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5638#discussion_r173606447 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1130,4 +1130,13 @@ object concat_ws { } } +object log { + def apply(base: Expression, antilogarithm: Expression): Expression = { +Log(base, antilogarithm) + } + def apply(antilogarithm: Expression): Expression = { +new Log(antilogarithm) --- End diff -- Is it just `new Ln(antilogarithm)`, we might be able to restructure them together? ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r172408177 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream-stream non-window Join. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + // check if input types implement proper equals/hashCode + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + // state to hold left stream element + protected var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + protected var rightState: MapState[Row, JTuple2[Int, Long]] = _ + protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + protected var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + protected var rightTimer: ValueState[Long] = _ + + // other condition function + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new MapStateDescriptor[Row, JT
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r172408402 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- I think either is fine as long as they are consistent. ---
[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ @hequn8128 @fhueske Thanks for the feedback. I have updated the diff to directly use DistinctAccumulator for filtering and modified the `generateAggregation` API. Please kindly take another look when you have time. I have resolved the issue of multiple layer dataview codegen. In terms of reusing same `DataView` for multiple distinct aggregations against the same field, I tried to incorporate but there are many assumptions with single mapping between `AggregateFunction`s and `Accumulator`s that's hard to deal with. I am planning to continue and improve on it in a separated JIRA, what do you think? ---
[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5174#discussion_r171346253 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcWithSplitCodeGenITCase.scala --- @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase --- End diff -- Yes, it does but not in flink-table module ---
[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5174#discussion_r170804955 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala --- @@ -41,7 +41,7 @@ import org.apache.flink.table.codegen.Indenter.toISC class FunctionCodeGenerator( config: TableConfig, nullableInput: Boolean, -input1: TypeInformation[_ <: Any], +val input1: TypeInformation[_ <: Any], --- End diff -- Any specific reason for this change? typo? ---
[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5174#discussion_r171358583 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala --- @@ -46,10 +47,22 @@ trait CommonCalc { returnSchema.fieldNames, calcProjection) +val (defines, bodies, callings) = generateCalcSplitFunctions( + generator, + projection.codeBuffer, + config.getMaxGeneratedCodeLength) + +val split = config.getMaxGeneratedCodeLength < projection.code.length +val projectionCode = if (split && !projection.codeBuffer.isEmpty) { --- End diff -- We can probably add this to the `generatedCalcSplitFunctions`, and let it always return the `samHeader` function (split or not), and with optional `defines` and `bodies`. This encapsulates all splitting functionality in one place, what do you think? ---
[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5174#discussion_r171360773 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcWithSplitCodeGenITCase.scala --- @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableConfig, TableEnvironment} +import org.apache.flink.table.expressions.Literal +import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2} +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, UserDefinedFunctionTestUtils} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.mutable + +/** + * set TableConfig's MaxGeneratedCodeLength to 1 + * make sure every expression is in an independent function call + */ +class CalcWithSplitCodeGenITCase extends StreamingMultipleProgramsTestBase { --- End diff -- I see you probably copied all tests in `CalcITCase` only to add the configuration for the max codegen length to 1. Why not try using something similar to `TableProgramsTestBase` to add two unique test configs? ---
[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5174#discussion_r171357748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala --- @@ -171,4 +186,47 @@ trait CommonCalc { rowCnt } } + + /** +* split origin generated code to split function calls, only used for calc. +* @param generator +* @param codeBuffer +* @param maxLength +* @return (method definitions, method bodies, method callings) of split function calls +*/ + private def generateCalcSplitFunctions( --- End diff -- Seems like in order to avoid the out of compilation limit issue, you need to put this in all Common nodes. Any changes we can put it in `plan/util` and make it more generic? will probably imagine same thing could happen to any plan node that requires to generate a row of outputs. ---
[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ I agree this design is much cleaner and easier to maintain later. I was hesitating to change the function signature of `generateAggregations()`. I will try to introduce some common building blocks and handle implementation effort in codegen. I will imagine a lot of ITCases will be needed :-) Thanks for the feedback @hequn8128 @fhueske ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170610689 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala --- @@ -201,18 +202,294 @@ class JoinITCase extends StreamingWithStateTestBase { // Proctime window output uncertain results, so assert has been ignored here. } + @Test + def testJoin(): Unit = { --- End diff -- Can this be more specific? like, inner equality join ---