flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule
Repository: flink Updated Branches: refs/heads/release-1.4 e7f7d0c93 -> 42e24413b [FLINK-7986] [table] Introduce FilterSetOpTransposeRule This closes #4956. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42e24413 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42e24413 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42e24413 Branch: refs/heads/release-1.4 Commit: 42e24413b5a47928e06f2a61086f7559370c65d8 Parents: e7f7d0c Author: Xpray Authored: Mon Nov 6 23:47:33 2017 +0800 Committer: twalthr Committed: Thu Nov 16 16:16:22 2017 +0100 -- .../flink/table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOperatorsTest.scala | 80 .../api/stream/table/SetOperatorsTest.scala | 68 + 3 files changed, 150 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index dcc735d..a20d14f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -52,6 +52,8 @@ object FlinkRuleSets { FilterJoinRule.JOIN, // push filter through an aggregation FilterAggregateTransposeRule.INSTANCE, +// push filter through set operation +FilterSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 2d4e205..35f4429 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase { util.verifyJavaTable(in, expected) } + + @Test + def testFilterUnionTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetUnion", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("union", "a", "b", "c") +), +term("groupBy", "b"), +term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") +) + +util.verifyTable(result, expected) + } + + @Test + def testFilterMinusTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.minusAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetMinus", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("minus", "a", "b", "c") +), +term("groupBy", "b"), +term("select", "b", "SUM(a) AS TMP_
flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule
Repository: flink Updated Branches: refs/heads/master cd1fbc078 -> 81dc260dc [FLINK-7986] [table] Introduce FilterSetOpTransposeRule This closes #4956. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81dc260d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81dc260d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81dc260d Branch: refs/heads/master Commit: 81dc260dc653085b9dbf098e8fd70a72d2d0828e Parents: cd1fbc0 Author: Xpray Authored: Mon Nov 6 23:47:33 2017 +0800 Committer: twalthr Committed: Thu Nov 16 14:43:50 2017 +0100 -- .../flink/table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOperatorsTest.scala | 80 .../api/stream/table/SetOperatorsTest.scala | 68 + 3 files changed, 150 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index dcc735d..a20d14f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -52,6 +52,8 @@ object FlinkRuleSets { FilterJoinRule.JOIN, // push filter through an aggregation FilterAggregateTransposeRule.INSTANCE, +// push filter through set operation +FilterSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 2d4e205..35f4429 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase { util.verifyJavaTable(in, expected) } + + @Test + def testFilterUnionTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetUnion", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("union", "a", "b", "c") +), +term("groupBy", "b"), +term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") +) + +util.verifyTable(result, expected) + } + + @Test + def testFilterMinusTranspose(): Unit = { +val util = batchTestUtil() +val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) +val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + +val result = left.minusAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + +val expected = unaryNode( + "DataSetCalc", + unaryNode( +"DataSetAggregate", +binaryNode( + "DataSetMinus", + unaryNode( +"DataSetCalc", +batchTableNode(0), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + unaryNode( +"DataSetCalc", +batchTableNode(1), +term("select", "a", "b", "c"), +term("where", ">(a, 0)") + ), + term("minus", "a", "b", "c") +), +term("groupBy", "b"), +term("select", "b", "SUM(a) AS TMP_0", "COUNT
[1/5] flink git commit: [FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema.
Repository: flink Updated Branches: refs/heads/release-1.4 397f0d150 -> e7f7d0c93 [FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema. This closes #4989. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b59fae3f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b59fae3f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b59fae3f Branch: refs/heads/release-1.4 Commit: b59fae3f51b22c299c3d4c51ff0da74fd245072b Parents: 397f0d1 Author: Shuyi Chen Authored: Thu Nov 9 00:05:20 2017 -0800 Committer: Fabian Hueske Committed: Thu Nov 16 11:45:38 2017 +0100 -- .../plan/schema/CompositeRelDataType.scala | 4 +++- .../table/runtime/batch/sql/CalcITCase.scala| 19 ++ .../table/runtime/batch/table/CalcITCase.scala | 10 +++--- .../table/runtime/stream/sql/SqlITCase.scala| 21 .../table/runtime/stream/table/CalcITCase.scala | 16 +++ .../table/runtime/utils/StreamTestData.scala| 9 + 6 files changed, 71 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b59fae3f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala index e0c6b6f..f8c61fb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala @@ -38,7 +38,9 @@ class CompositeRelDataType( val compositeType: CompositeType[_], val nullable: Boolean, typeFactory: FlinkTypeFactory) - extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) { + extends RelRecordType( +StructKind.PEEK_FIELDS_NO_EXPAND, +createFieldList(compositeType, typeFactory)) { override def toString = s"COMPOSITE($compositeType)" http://git-wip-us.apache.org/repos/asf/flink/blob/b59fae3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala index b891a7d..7ca3e9c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala @@ -67,6 +67,25 @@ class CalcITCase( } @Test + def testSelectStarFromNestedTable(): Unit = { + +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val sqlQuery = "SELECT * FROM MyTable" + +val ds = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv).as('a, 'b) +tEnv.registerTable("MyTable", ds) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n" + +val results = result.toDataSet[Row].collect() +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testSelectStarFromDataSet(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/b59fae3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala index e947c3f..22373d2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala @@ -118,14 +118,10 @@ class CalcITCase( val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) -val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*) +val t = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b).select('*) -val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\
[5/5] flink git commit: [FLINK-8069] [table] Add preserving WatermarkStrategy.
[FLINK-8069] [table] Add preserving WatermarkStrategy. This closes #5016. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7f7d0c9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7f7d0c9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7f7d0c9 Branch: refs/heads/release-1.4 Commit: e7f7d0c9333f3f9db488c5f968a1627401485067 Parents: f14fcef Author: Xingcan Cui Authored: Wed Nov 15 11:01:13 2017 +0800 Committer: Fabian Hueske Committed: Thu Nov 16 11:47:16 2017 +0100 -- .../datastream/StreamTableSourceScan.scala | 5 +- .../wmstrategies/watermarkStrategies.scala | 6 +++ .../stream/table/TableSourceITCase.scala| 51 .../flink/table/utils/testTableSources.scala| 28 ++- 4 files changed, 88 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 5d305b4..9179d4b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sources._ -import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner} +import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner, PreserveWatermarks} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ @@ -134,6 +134,9 @@ class StreamTableSourceScan( case p: PunctuatedWatermarkAssigner => val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) +case _: PreserveWatermarks => + // The watermarks have already been provided by the underlying DataStream. + ingestedTable } } else { // No need to generate watermarks if no rowtime attribute is specified. http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala index 0dd82f1..4c7f4e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala @@ -60,3 +60,9 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy { */ def getWatermark(row: Row, timestamp: Long): Watermark } + +/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/ +class PreserveWatermarks extends WatermarkStrategy +object PreserveWatermarks { + val INSTANCE: PreserveWatermarks = new PreserveWatermarks +} http://git-wip-us.apache.org/repos/asf/flink/blob/e7f7d0c9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index 77c1e08..c9ea30a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -27,6 +27,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.a
[3/5] flink git commit: [FLINK-8014] [table] Add Kafka010JsonTableSink.
[FLINK-8014] [table] Add Kafka010JsonTableSink. - Refactor KafkaTableSink tests. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fd53112 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fd53112 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fd53112 Branch: refs/heads/release-1.4 Commit: 2fd53112289ec9002b2c26920dd56ea77be73cd5 Parents: 8883fa2 Author: Fabian Hueske Authored: Tue Nov 7 17:59:43 2017 +0100 Committer: Fabian Hueske Committed: Thu Nov 16 11:45:54 2017 +0100 -- .../connectors/kafka/Kafka010JsonTableSink.java | 73 .../kafka/Kafka010JsonTableSinkTest.java| 53 ++ .../connectors/kafka/Kafka08JsonTableSink.java | 26 ++- .../kafka/Kafka08JsonTableSinkTest.java | 27 +++- .../connectors/kafka/Kafka09JsonTableSink.java | 26 ++- .../kafka/Kafka09JsonTableSinkTest.java | 27 +++- .../connectors/kafka/KafkaJsonTableSink.java| 5 +- .../connectors/kafka/KafkaTableSink.java| 10 ++- .../JsonRowSerializationSchema.java | 22 +- .../kafka/JsonRowSerializationSchemaTest.java | 46 .../kafka/KafkaTableSinkTestBase.java | 30 11 files changed, 269 insertions(+), 76 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2fd53112/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java -- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java new file mode 100644 index 000..431ace0 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka010JsonTableSink extends KafkaJsonTableSink { + + /** +* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 +* topic with fixed partition assignment. +* +* Each parallel TableSink instance will write its rows to a single Kafka partition. +* +* If the number of Kafka partitions is less than the number of sink instances, different +* sink instances will write to the same partition. +* If the number of Kafka partitions is higher than the number of sink instance, some +* Kafka partitions won't receive data. +* +* +* @param topic topic in Kafka to which table is written +* @param properties properties to connect to Kafka +*/ + public Kafka010JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** +* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 +* topic with custom partition assignment. +* +* @param topic topic in Kafka to which table is written +* @param properties properties to connect to Kafka +* @param partitioner Kafka partitioner +*/ + public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner partitioner) { + super(topic, properties, partitioner);
[4/5] flink git commit: [FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.
[FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks. This closes #4990. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f14fcefb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f14fcefb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f14fcefb Branch: refs/heads/release-1.4 Commit: f14fcefbe1b30eb8178a5caa053c84e505c288b2 Parents: 2fd5311 Author: Fabian Hueske Authored: Thu Nov 9 15:07:17 2017 +0100 Committer: Fabian Hueske Committed: Thu Nov 16 11:46:04 2017 +0100 -- docs/dev/table/sourceSinks.md | 45 ++ 1 file changed, 45 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f14fcefb/docs/dev/table/sourceSinks.md -- diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index dfa7954..0b4bdbe 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -496,6 +496,7 @@ The following table lists the `TableSink`s which are provided with Flink. | `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table. | `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 0.8 sink with JSON encoding. | `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 0.9 sink with JSON encoding. +| `Kafka010JsonTableSink` | `flink-connector-kafka-0.10` | N | Append | A Kafka 0.10 sink with JSON encoding. All sinks that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sinks, you have to add the respective dependency in addition to the `flink-table` dependency. @@ -503,6 +504,50 @@ A custom `TableSink` can be defined by implementing the `BatchTableSink`, `Appen {% top %} +### KafkaJsonTableSink + +A `KafkaJsonTableSink` emits a [streaming append `Table`](./streaming.html#table-to-stream-conversion) to an Apache Kafka topic. The rows of the table are encoded as JSON records. Currently, only tables with flat schema, i.e., non-nested fields, are supported. + +A `KafkaJsonTableSink` produces with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({{ site.baseurl }}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing). + +By default, a `KafkaJsonTableSink` writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition). In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom `FlinkKafkaPartitioner` can be provided. + +The following example shows how to create a `KafkaJsonTableSink` for Kafka 0.10. Sinks for Kafka 0.8 and 0.9 are instantiated analogously. + + + +{% highlight java %} + +Table table = ... + +Properties props = new Properties(); +props.setProperty("bootstrap.servers", "localhost:9092"); + +table.writeToSink( + new Kafka010JsonTableSink( +"myTopic",// Kafka topic to write to +props)); // Properties to configure the producer + +{% endhighlight %} + + + +{% highlight scala %} + +val table: Table = ??? + +val props = new Properties() +props.setProperty("bootstrap.servers", "localhost:9092") + +table.writeToSink( + new Kafka010JsonTableSink( +"myTopic",// Kafka topic to write to +props)) // Properties to configure the producer + +{% endhighlight %} + + + ### CsvTableSink The `CsvTableSink` emits a `Table` to one or more CSV files.
[2/5] flink git commit: [FLINK-7389] [table] Remove Calcite PushProjector
[FLINK-7389] [table] Remove Calcite PushProjector This closes #5022. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8883fa2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8883fa2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8883fa2d Branch: refs/heads/release-1.4 Commit: 8883fa2dd910909d4aaf5c57cf4b15a41c8cf7e1 Parents: b59fae3 Author: twalthr Authored: Thu Nov 16 10:20:16 2017 +0100 Committer: Fabian Hueske Committed: Thu Nov 16 11:45:46 2017 +0100 -- .../apache/calcite/rel/rules/PushProjector.java | 868 --- .../table/runtime/batch/sql/CalcITCase.scala| 2 +- .../table/runtime/batch/sql/JoinITCase.scala| 4 +- 3 files changed, 4 insertions(+), 870 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8883fa2d/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java -- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java deleted file mode 100644 index 0955aeb..000 --- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java +++ /dev/null @@ -1,868 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.rel.rules; - -import org.apache.calcite.linq4j.Ord; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.Strong; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.SemiJoin; -import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.rex.RexVisitorImpl; -import org.apache.calcite.runtime.PredicateImpl; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.util.BitSets; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Pair; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; - -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; -import java.util.Set; - -// This class is copied from Apache Calcite except that it does not -// automatically name the field using the name of the operators -// as the Table API rejects special characters like '-' in the field names. - -/** - * PushProjector is a utility class used to perform operations used in push - * projection rules. - * - * Pushing is particularly interesting in the case of join, because there - * are multiple inputs. Generally an expression can be pushed down to a - * particular input if it depends upon no other inputs. If it can be pushed - * down to both sides, it is pushed down to the left. - * - * Sometimes an expression needs to be split before it can be pushed down. - * To flag that an expression cannot be split, specify a rule that it must be - * preserved. Such an expression will be pushed down intact to one - * of the inputs, or not pushed down at all. - */ -public class PushProjector { - //~ Instance fields - - private final Project origProj; - private final RexNode origFilter; - private final RelNode childRel; - private final ExprCondition preserveExprCondition; - private final RelBuilder relBuilder; - - /** - * Original projection expressions - */ - final List origProjExprs; - - /** - * Fields from the RelNode that the projection is being pushed past -
[4/5] flink git commit: [FLINK-8069] [table] Add preserving WatermarkStrategy.
[FLINK-8069] [table] Add preserving WatermarkStrategy. This closes #5016. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd1fbc07 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd1fbc07 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd1fbc07 Branch: refs/heads/master Commit: cd1fbc0789d039e38c10d0a978137156aacbf186 Parents: c697bc1 Author: Xingcan Cui Authored: Wed Nov 15 11:01:13 2017 +0800 Committer: Fabian Hueske Committed: Thu Nov 16 11:32:12 2017 +0100 -- .../datastream/StreamTableSourceScan.scala | 5 +- .../wmstrategies/watermarkStrategies.scala | 6 +++ .../stream/table/TableSourceITCase.scala| 51 .../flink/table/utils/testTableSources.scala| 28 ++- 4 files changed, 88 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 5d305b4..9179d4b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sources._ -import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner} +import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner, PreserveWatermarks} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ @@ -134,6 +134,9 @@ class StreamTableSourceScan( case p: PunctuatedWatermarkAssigner => val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) +case _: PreserveWatermarks => + // The watermarks have already been provided by the underlying DataStream. + ingestedTable } } else { // No need to generate watermarks if no rowtime attribute is specified. http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala index 0dd82f1..4c7f4e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala @@ -60,3 +60,9 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy { */ def getWatermark(row: Row, timestamp: Long): Watermark } + +/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/ +class PreserveWatermarks extends WatermarkStrategy +object PreserveWatermarks { + val INSTANCE: PreserveWatermarks = new PreserveWatermarks +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd1fbc07/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index 77c1e08..c9ea30a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -27,6 +27,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.da
[3/5] flink git commit: [FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks.
[FLINK-8016] [docs] Add documentation for KafkaJsonTableSinks. This closes #4990. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c697bc14 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c697bc14 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c697bc14 Branch: refs/heads/master Commit: c697bc1499de5db324ff64af0341ebfccf6a6125 Parents: 50fba9a Author: Fabian Hueske Authored: Thu Nov 9 15:07:17 2017 +0100 Committer: Fabian Hueske Committed: Thu Nov 16 11:32:12 2017 +0100 -- docs/dev/table/sourceSinks.md | 45 ++ 1 file changed, 45 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c697bc14/docs/dev/table/sourceSinks.md -- diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index dfa7954..0b4bdbe 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -496,6 +496,7 @@ The following table lists the `TableSink`s which are provided with Flink. | `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table. | `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 0.8 sink with JSON encoding. | `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 0.9 sink with JSON encoding. +| `Kafka010JsonTableSink` | `flink-connector-kafka-0.10` | N | Append | A Kafka 0.10 sink with JSON encoding. All sinks that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sinks, you have to add the respective dependency in addition to the `flink-table` dependency. @@ -503,6 +504,50 @@ A custom `TableSink` can be defined by implementing the `BatchTableSink`, `Appen {% top %} +### KafkaJsonTableSink + +A `KafkaJsonTableSink` emits a [streaming append `Table`](./streaming.html#table-to-stream-conversion) to an Apache Kafka topic. The rows of the table are encoded as JSON records. Currently, only tables with flat schema, i.e., non-nested fields, are supported. + +A `KafkaJsonTableSink` produces with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({{ site.baseurl }}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing). + +By default, a `KafkaJsonTableSink` writes to at most as many partitions as its own parallelism (each parallel instance of the sink writes to exactly one partition). In order to distribute the writes to more partitions or control the routing of rows into partitions, a custom `FlinkKafkaPartitioner` can be provided. + +The following example shows how to create a `KafkaJsonTableSink` for Kafka 0.10. Sinks for Kafka 0.8 and 0.9 are instantiated analogously. + + + +{% highlight java %} + +Table table = ... + +Properties props = new Properties(); +props.setProperty("bootstrap.servers", "localhost:9092"); + +table.writeToSink( + new Kafka010JsonTableSink( +"myTopic",// Kafka topic to write to +props)); // Properties to configure the producer + +{% endhighlight %} + + + +{% highlight scala %} + +val table: Table = ??? + +val props = new Properties() +props.setProperty("bootstrap.servers", "localhost:9092") + +table.writeToSink( + new Kafka010JsonTableSink( +"myTopic",// Kafka topic to write to +props)) // Properties to configure the producer + +{% endhighlight %} + + + ### CsvTableSink The `CsvTableSink` emits a `Table` to one or more CSV files.
[1/5] flink git commit: [FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema.
Repository: flink Updated Branches: refs/heads/master 101fef739 -> cd1fbc078 [FLINK-7003] [table] Fix 'SELECT *' for tables with nested schema. This closes #4989. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a63d2be5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a63d2be5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a63d2be5 Branch: refs/heads/master Commit: a63d2be5091d7d2f77161ec17e685ae546f2b0ec Parents: 101fef7 Author: Shuyi Chen Authored: Thu Nov 9 00:05:20 2017 -0800 Committer: Fabian Hueske Committed: Thu Nov 16 11:32:11 2017 +0100 -- .../plan/schema/CompositeRelDataType.scala | 4 +++- .../table/runtime/batch/sql/CalcITCase.scala| 19 ++ .../table/runtime/batch/table/CalcITCase.scala | 10 +++--- .../table/runtime/stream/sql/SqlITCase.scala| 21 .../table/runtime/stream/table/CalcITCase.scala | 16 +++ .../table/runtime/utils/StreamTestData.scala| 9 + 6 files changed, 71 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a63d2be5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala index e0c6b6f..f8c61fb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala @@ -38,7 +38,9 @@ class CompositeRelDataType( val compositeType: CompositeType[_], val nullable: Boolean, typeFactory: FlinkTypeFactory) - extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) { + extends RelRecordType( +StructKind.PEEK_FIELDS_NO_EXPAND, +createFieldList(compositeType, typeFactory)) { override def toString = s"COMPOSITE($compositeType)" http://git-wip-us.apache.org/repos/asf/flink/blob/a63d2be5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala index b891a7d..7ca3e9c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala @@ -67,6 +67,25 @@ class CalcITCase( } @Test + def testSelectStarFromNestedTable(): Unit = { + +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val sqlQuery = "SELECT * FROM MyTable" + +val ds = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv).as('a, 'b) +tEnv.registerTable("MyTable", ds) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = "(1,1),one\n" + "(2,2),two\n" + "(3,3),three\n" + +val results = result.toDataSet[Row].collect() +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testSelectStarFromDataSet(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/a63d2be5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala index e947c3f..22373d2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala @@ -118,14 +118,10 @@ class CalcITCase( val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) -val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*) +val t = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b).select('*) -val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + -
[5/5] flink git commit: [FLINK-7389] [table] Remove Calcite PushProjector
[FLINK-7389] [table] Remove Calcite PushProjector This closes #5022. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc3eebd1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc3eebd1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc3eebd1 Branch: refs/heads/master Commit: fc3eebd1e46c680a99ac22f20adda40841713294 Parents: a63d2be Author: twalthr Authored: Thu Nov 16 10:20:16 2017 +0100 Committer: Fabian Hueske Committed: Thu Nov 16 11:32:12 2017 +0100 -- .../apache/calcite/rel/rules/PushProjector.java | 868 --- .../table/runtime/batch/sql/CalcITCase.scala| 2 +- .../table/runtime/batch/sql/JoinITCase.scala| 4 +- 3 files changed, 4 insertions(+), 870 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fc3eebd1/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java -- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java deleted file mode 100644 index 0955aeb..000 --- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java +++ /dev/null @@ -1,868 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.rel.rules; - -import org.apache.calcite.linq4j.Ord; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.Strong; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.SemiJoin; -import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.rex.RexVisitorImpl; -import org.apache.calcite.runtime.PredicateImpl; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.util.BitSets; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Pair; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; - -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; -import java.util.Set; - -// This class is copied from Apache Calcite except that it does not -// automatically name the field using the name of the operators -// as the Table API rejects special characters like '-' in the field names. - -/** - * PushProjector is a utility class used to perform operations used in push - * projection rules. - * - * Pushing is particularly interesting in the case of join, because there - * are multiple inputs. Generally an expression can be pushed down to a - * particular input if it depends upon no other inputs. If it can be pushed - * down to both sides, it is pushed down to the left. - * - * Sometimes an expression needs to be split before it can be pushed down. - * To flag that an expression cannot be split, specify a rule that it must be - * preserved. Such an expression will be pushed down intact to one - * of the inputs, or not pushed down at all. - */ -public class PushProjector { - //~ Instance fields - - private final Project origProj; - private final RexNode origFilter; - private final RelNode childRel; - private final ExprCondition preserveExprCondition; - private final RelBuilder relBuilder; - - /** - * Original projection expressions - */ - final List origProjExprs; - - /** - * Fields from the RelNode that the projection is being pushed past - */ -
[2/5] flink git commit: [FLINK-8014] [table] Add Kafka010JsonTableSink.
[FLINK-8014] [table] Add Kafka010JsonTableSink. - Refactor KafkaTableSink tests. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50fba9aa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50fba9aa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50fba9aa Branch: refs/heads/master Commit: 50fba9aa4e96632f7b32cf98d704683364196cbd Parents: fc3eebd Author: Fabian Hueske Authored: Tue Nov 7 17:59:43 2017 +0100 Committer: Fabian Hueske Committed: Thu Nov 16 11:32:12 2017 +0100 -- .../connectors/kafka/Kafka010JsonTableSink.java | 73 .../kafka/Kafka010JsonTableSinkTest.java| 53 ++ .../connectors/kafka/Kafka08JsonTableSink.java | 26 ++- .../kafka/Kafka08JsonTableSinkTest.java | 27 +++- .../connectors/kafka/Kafka09JsonTableSink.java | 26 ++- .../kafka/Kafka09JsonTableSinkTest.java | 27 +++- .../connectors/kafka/KafkaJsonTableSink.java| 5 +- .../connectors/kafka/KafkaTableSink.java| 10 ++- .../JsonRowSerializationSchema.java | 22 +- .../kafka/JsonRowSerializationSchemaTest.java | 46 .../kafka/KafkaTableSinkTestBase.java | 30 11 files changed, 269 insertions(+), 76 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java -- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java new file mode 100644 index 000..431ace0 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka010JsonTableSink extends KafkaJsonTableSink { + + /** +* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 +* topic with fixed partition assignment. +* +* Each parallel TableSink instance will write its rows to a single Kafka partition. +* +* If the number of Kafka partitions is less than the number of sink instances, different +* sink instances will write to the same partition. +* If the number of Kafka partitions is higher than the number of sink instance, some +* Kafka partitions won't receive data. +* +* +* @param topic topic in Kafka to which table is written +* @param properties properties to connect to Kafka +*/ + public Kafka010JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** +* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 +* topic with custom partition assignment. +* +* @param topic topic in Kafka to which table is written +* @param properties properties to connect to Kafka +* @param partitioner Kafka partitioner +*/ + public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner partitioner) { + super(topic, properties, partitioner); +
flink git commit: [FLINK-7698] [table] Tests joins with null literals
Repository: flink Updated Branches: refs/heads/release-1.4 13962e1ff -> 397f0d150 [FLINK-7698] [table] Tests joins with null literals Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/397f0d15 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/397f0d15 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/397f0d15 Branch: refs/heads/release-1.4 Commit: 397f0d150255ea24fd5fd4cc38848441bb0fff58 Parents: 13962e1 Author: twalthr Authored: Thu Nov 16 11:00:30 2017 +0100 Committer: twalthr Committed: Thu Nov 16 11:06:14 2017 +0100 -- .../flink/table/api/stream/sql/JoinTest.scala | 45 1 file changed, 45 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/397f0d15/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 8c1865c..c14b698 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -19,7 +19,9 @@ package org.apache.flink.table.api.stream.sql import org.apache.calcite.rel.logical.LogicalJoin import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.Null import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.runtime.join.WindowJoinUtil import org.apache.flink.table.utils.TableTestUtil.{term, _} @@ -245,6 +247,49 @@ class JoinTest extends TableTestBase { } @Test + def testJoinWithNullLiteral(): Unit = { +val streamUtil: StreamTableTestUtil = streamTestUtil() + +val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField) + +val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime, 12L as 'nullField) + +streamUtil.tableEnv.registerTable("T1", t1) +streamUtil.tableEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 AS t1 +|JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND +| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND +| t2.proctime + INTERVAL '5' SECOND +|""".stripMargin + +val expected = + unaryNode("DataStreamCalc", +binaryNode("DataStreamWindowJoin", + unaryNode("DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "proctime", "null AS nullField") + ), + unaryNode("DataStreamCalc", +streamTableNode(1), +term("select", "a", "c", "proctime", "12 AS nullField") + ), + term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, " + +"-(proctime0, 5000)), <=(proctime, DATETIME_PLUS(proctime0, 5000)))"), + term("join", "a", "c", "proctime", "nullField", "a0", "c0", "proctime0", "nullField0"), + term("joinType", "InnerJoin") +), +term("select", "a0 AS a", "c0 AS c", "c AS c0") + ) +streamUtil.verifySql(sqlQuery, expected) + } + + @Test def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = { val sqlQuery =
flink git commit: [FLINK-7698] [table] Tests joins with null literals
Repository: flink Updated Branches: refs/heads/master d0b2aa28d -> 101fef739 [FLINK-7698] [table] Tests joins with null literals Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/101fef73 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/101fef73 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/101fef73 Branch: refs/heads/master Commit: 101fef7397128b0aba23221481ab86f62b18118f Parents: d0b2aa2 Author: twalthr Authored: Thu Nov 16 11:00:30 2017 +0100 Committer: twalthr Committed: Thu Nov 16 11:00:30 2017 +0100 -- .../flink/table/api/stream/sql/JoinTest.scala | 45 1 file changed, 45 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/101fef73/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 8c1865c..c14b698 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -19,7 +19,9 @@ package org.apache.flink.table.api.stream.sql import org.apache.calcite.rel.logical.LogicalJoin import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.Null import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.runtime.join.WindowJoinUtil import org.apache.flink.table.utils.TableTestUtil.{term, _} @@ -245,6 +247,49 @@ class JoinTest extends TableTestBase { } @Test + def testJoinWithNullLiteral(): Unit = { +val streamUtil: StreamTableTestUtil = streamTestUtil() + +val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField) + +val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime, 12L as 'nullField) + +streamUtil.tableEnv.registerTable("T1", t1) +streamUtil.tableEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 AS t1 +|JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND +| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND +| t2.proctime + INTERVAL '5' SECOND +|""".stripMargin + +val expected = + unaryNode("DataStreamCalc", +binaryNode("DataStreamWindowJoin", + unaryNode("DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "proctime", "null AS nullField") + ), + unaryNode("DataStreamCalc", +streamTableNode(1), +term("select", "a", "c", "proctime", "12 AS nullField") + ), + term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, " + +"-(proctime0, 5000)), <=(proctime, DATETIME_PLUS(proctime0, 5000)))"), + term("join", "a", "c", "proctime", "nullField", "a0", "c0", "proctime0", "nullField0"), + term("joinType", "InnerJoin") +), +term("select", "a0 AS a", "c0 AS c", "c AS c0") + ) +streamUtil.verifySql(sqlQuery, expected) + } + + @Test def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = { val sqlQuery =