This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2fc82ad5b92367c9f9a2aad5e6df3fd78108754e Author: godfreyhe <godfre...@163.com> AuthorDate: Fri Apr 24 20:45:12 2020 +0800 [FLINK-17267] [table-planner] legacy batch planner supports explain insert operation --- .../table/tests/test_table_environment_api.py | 5 +- .../table/api/internal/BatchTableEnvImpl.scala | 191 +++++++++++++++++---- .../flink/table/api/internal/TableEnvImpl.scala | 63 ++++++- .../table/plan/nodes/dataset/DataSetSink.scala | 57 ++++++ .../flink/table/plan/rules/FlinkRuleSets.scala | 3 +- .../table/plan/rules/dataSet/DataSetSinkRule.scala | 55 ++++++ .../apache/flink/table/api/batch/ExplainTest.scala | 71 ++++++-- .../sql/validation/InsertIntoValidationTest.scala | 8 + .../validation/InsertIntoValidationTest.scala | 4 + .../src/test/scala/resources/testInsert1.out | 27 +++ .../test/scala/resources/testMultipleInserts1.out | 51 ++++++ 11 files changed, 477 insertions(+), 58 deletions(-) diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index c1bf04a..87c8023 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -400,8 +400,9 @@ class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkBatchTableTestCase t_env.sql_update("insert into sink1 select * from %s where a > 100" % source) t_env.sql_update("insert into sink2 select * from %s where a < 100" % source) - with self.assertRaises(TableException): - t_env.explain(extended=True) + actual = t_env.explain(extended=True) + + assert isinstance(actual, str) def test_create_table_environment(self): table_config = TableConfig() diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala index 343fc23..e25b8e4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala @@ -38,8 +38,9 @@ import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression} import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES import org.apache.flink.table.module.ModuleManager -import org.apache.flink.table.operations.{DataSetQueryOperation, QueryOperation} +import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation} import org.apache.flink.table.plan.BatchOptimizer +import org.apache.flink.table.plan.nodes.LogicalSink import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.planner.Conversions import org.apache.flink.table.runtime.MapRunner @@ -74,7 +75,7 @@ abstract class BatchTableEnvImpl( moduleManager: ModuleManager) extends TableEnvImpl(config, catalogManager, moduleManager) { - private val bufferedSinks = new JArrayList[DataSink[_]] + private val bufferedModifyOperations = new JArrayList[ModifyOperation]() private[flink] val optimizer = new BatchOptimizer( () => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT), @@ -170,8 +171,8 @@ abstract class BatchTableEnvImpl( } } - override protected def addToBuffer(sink: DataSink[_]): Unit = { - bufferedSinks.add(sink) + override protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit = { + bufferedModifyOperations.add(modifyOperation) } /** @@ -215,32 +216,69 @@ abstract class BatchTableEnvImpl( * @param extended Flag to include detailed optimizer estimates. */ private[flink] def explain(table: Table, extended: Boolean): String = { - val ast = getRelBuilder.tableOperation(table.getQueryOperation).build() - val optimizedPlan = optimizer.optimize(ast) - val dataSet = translate[Row]( - optimizedPlan, - getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))( - new GenericTypeInfo(classOf[Row])) - dataSet.output(new DiscardingOutputFormat[Row]) - val env = dataSet.getExecutionEnvironment + explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended) + } + + override def explain(table: Table): String = explain(table: Table, extended = false) + + override def explain(extended: Boolean): String = { + explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended) + } + + private def explain(operations: JList[Operation], extended: Boolean): String = { + require(operations.asScala.nonEmpty, "operations should not be empty") + val astList = operations.asScala.map { + case queryOperation: QueryOperation => + getRelBuilder.tableOperation(queryOperation).build() + case modifyOperation: ModifyOperation => + translateToRel(modifyOperation, addLogicalSink = true) + case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}") + } + + val optimizedNodes = astList.map(optimizer.optimize) + + val batchTableEnv = createDummyBatchTableEnv() + val dataSinks = optimizedNodes.zip(operations.asScala).map { + case (optimizedNode, operation) => + operation match { + case queryOperation: QueryOperation => + val dataSet = translate[Row]( + optimizedNode, + getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))( + new GenericTypeInfo(classOf[Row])) + dataSet.output(new DiscardingOutputFormat[Row]) + case modifyOperation: ModifyOperation => + val tableSink = getTableSink(modifyOperation) + translate( + batchTableEnv, + optimizedNode, + tableSink, + getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode)) + case o => + throw new TableException("Unsupported Operation: " + o.asSummaryString()) + } + } + + val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator) + val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator) + + val env = dataSinks.head.getDataSet.getExecutionEnvironment val jasonSqlPlan = env.getExecutionPlan val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) s"== Abstract Syntax Tree ==" + - System.lineSeparator + - s"${RelOptUtil.toString(ast)}" + - System.lineSeparator + - s"== Optimized Logical Plan ==" + - System.lineSeparator + - s"${RelOptUtil.toString(optimizedPlan)}" + - System.lineSeparator + - s"== Physical Execution Plan ==" + - System.lineSeparator + - s"$sqlPlan" + System.lineSeparator + + s"$astPlan" + + System.lineSeparator + + s"== Optimized Logical Plan ==" + + System.lineSeparator + + s"$optimizedPlan" + + System.lineSeparator + + s"== Physical Execution Plan ==" + + System.lineSeparator + + s"$sqlPlan" } - override def explain(table: Table): String = explain(table: Table, extended = false) - override def execute(jobName: String): JobExecutionResult = { val plan = createPipelineAndClearBuffer(jobName) @@ -292,10 +330,6 @@ abstract class BatchTableEnvImpl( createPipelineAndClearBuffer(jobName) } - override def explain(extended: Boolean): String = { - throw new TableException("This method is unsupported in old planner.") - } - /** * Translate the buffered sinks to Plan, and clear the buffer. * @@ -304,10 +338,11 @@ abstract class BatchTableEnvImpl( * If the buffer is not clear after failure, the following `translate` will also fail. */ private def createPipelineAndClearBuffer(jobName: String): Pipeline = { + val dataSinks = translate(bufferedModifyOperations) try { - createPipeline(bufferedSinks, jobName) + createPipeline(dataSinks, jobName) } finally { - bufferedSinks.clear() + bufferedModifyOperations.clear() } } @@ -357,6 +392,102 @@ abstract class BatchTableEnvImpl( } /** + * Translates a [[ModifyOperation]] into a [[RelNode]]. + * + * The transformation does not involve optimizing the relational expression tree. + * + * @param modifyOperation The root ModifyOperation of the relational expression tree. + * @param addLogicalSink Whether add [[LogicalSink]] as the root. + * Currently, LogicalSink only is only used for explaining. + * @return The [[RelNode]] that corresponds to the translated [[ModifyOperation]]. + */ + private def translateToRel(modifyOperation: ModifyOperation, addLogicalSink: Boolean): RelNode = { + val input = getRelBuilder.tableOperation(modifyOperation.getChild).build() + if (addLogicalSink) { + val tableSink = getTableSink(modifyOperation) + modifyOperation match { + case s: CatalogSinkModifyOperation => + LogicalSink.create(input, tableSink, s.getTableIdentifier.toString) + case o => + throw new TableException("Unsupported Operation: " + o.asSummaryString()) + } + } else { + input + } + } + + /** + * Translates a list of [[ModifyOperation]] into a list of [[DataSink]]. + * + * The transformation involves optimizing the relational expression tree as defined by + * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators. + * + * @param modifyOperations The root [[ModifyOperation]]s of the relational expression tree. + * @return The [[DataSink]] that corresponds to the translated [[ModifyOperation]]s. + */ + private def translate[T](modifyOperations: JList[ModifyOperation]): JList[DataSink[_]] = { + val relNodes = modifyOperations.asScala.map(o => translateToRel(o, addLogicalSink = false)) + val optimizedNodes = relNodes.map(optimizer.optimize) + + val batchTableEnv = createDummyBatchTableEnv() + modifyOperations.asScala.zip(optimizedNodes).map { + case (modifyOperation, optimizedNode) => + val tableSink = getTableSink(modifyOperation) + translate( + batchTableEnv, + optimizedNode, + tableSink, + getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode)) + }.asJava + } + + /** + * Translates an optimized [[RelNode]] into a [[DataSet]] + * and handed over to the [[TableSink]] to write it. + * + * @param optimizedNode The [[RelNode]] to translate. + * @param tableSink The [[TableSink]] to write the [[Table]] to. + * @return The [[DataSink]] that corresponds to the [[RelNode]] and the [[TableSink]]. + */ + private def translate[T]( + batchTableEnv: BatchTableEnvImpl, + optimizedNode: RelNode, + tableSink: TableSink[T], + tableSchema: TableSchema): DataSink[_] = { + tableSink match { + case batchSink: BatchTableSink[T] => + val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType) + .asInstanceOf[TypeInformation[T]] + // translate the Table into a DataSet and provide the type that the TableSink expects. + val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType) + // create a dummy NoOpOperator, which holds dummy DummyExecutionEnvironment as context. + // NoOpOperator will be ignored in OperatorTranslation + // when translating DataSet to Operator, while its input can be translated normally. + val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType) + // Give the DataSet to the TableSink to emit it. + batchSink.consumeDataSet(dummyOp) + case boundedSink: OutputFormatTableSink[T] => + val outputType = fromDataTypeToLegacyInfo(tableSink.getConsumedDataType) + .asInstanceOf[TypeInformation[T]] + // translate the Table into a DataSet and provide the type that the TableSink expects. + val result: DataSet[T] = translate(optimizedNode, tableSchema)(outputType) + // create a dummy NoOpOperator, which holds DummyExecutionEnvironment as context. + // NoOpOperator will be ignored in OperatorTranslation + // when translating DataSet to Operator, while its input can be translated normally. + val dummyOp = new DummyNoOpOperator(batchTableEnv.execEnv, result, result.getType) + // use the OutputFormat to consume the DataSet. + val dataSink = dummyOp.output(boundedSink.getOutputFormat) + dataSink.name( + TableConnectorUtils.generateRuntimeName( + boundedSink.getClass, + boundedSink.getTableSchema.getFieldNames)) + case _ => + throw new TableException( + "BatchTableSink or OutputFormatTableSink required to emit batch Table.") + } + } + + /** * Translates a [[Table]] into a [[DataSet]]. * * The transformation involves optimizing the relational expression tree as defined by diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index f40b75d..3a97106 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -37,7 +37,7 @@ import org.apache.flink.table.operations.ddl._ import org.apache.flink.table.operations.utils.OperationTreeBuilder import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _} import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder} -import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils} +import org.apache.flink.table.sinks.{BatchTableSink, OutputFormatTableSink, OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils} import org.apache.flink.table.sources.TableSource import org.apache.flink.table.types.DataType import org.apache.flink.table.util.JavaScalaConversionUtil @@ -878,11 +878,11 @@ abstract class TableEnvImpl( tableSink: TableSink[T]): DataSink[_] /** - * Add the given [[DataSink]] into the buffer. + * Add the given [[ModifyOperation]] into the buffer. * - * @param dataSink The [[DataSink]] to add the buffer to. + * @param modifyOperation The [[ModifyOperation]] to add the buffer to. */ - protected def addToBuffer(dataSink: DataSink[_]): Unit + protected def addToBuffer[T](modifyOperation: ModifyOperation): Unit override def insertInto(path: String, table: Table): Unit = { val parser = planningConfigurationBuilder.createCalciteParser() @@ -919,15 +919,64 @@ abstract class TableEnvImpl( table: Table, insertOptions: InsertOptions, sinkIdentifier: ObjectIdentifier): Unit = { - val dataSink = writeToSinkAndTranslate(table.getQueryOperation, insertOptions, sinkIdentifier) - addToBuffer(dataSink) + val operation = new CatalogSinkModifyOperation( + sinkIdentifier, + table.getQueryOperation, + insertOptions.staticPartitions, + insertOptions.overwrite, + new JHashMap[String, String]()) + addToBuffer(operation) } override def getParser: Parser = parser override def getCatalogManager: CatalogManager = catalogManager - private def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = { + protected def getTableSink(modifyOperation: ModifyOperation): TableSink[_] = { + modifyOperation match { + case s: CatalogSinkModifyOperation => + getTableSink(s.getTableIdentifier) match { + case None => + throw new TableException( + s"No table was registered under the name ${s.getTableIdentifier}.") + + case Some(tableSink) => + tableSink match { + case _: BatchTableSink[_] => // do nothing + case _: OutputFormatTableSink[_] => // do nothing + case _ => + throw new TableException( + "BatchTableSink or OutputFormatTableSink required to emit batch Table.") + } + // validate schema of source table and table sink + TableSinkUtils.validateSink( + s.getStaticPartitions, + s.getChild, + s.getTableIdentifier, + tableSink) + // set static partitions if it is a partitioned table sink + tableSink match { + case partitionableSink: PartitionableTableSink => + partitionableSink.setStaticPartition(s.getStaticPartitions) + case _ => + } + // set whether to overwrite if it's an OverwritableTableSink + tableSink match { + case overwritableTableSink: OverwritableTableSink => + overwritableTableSink.setOverwrite(s.isOverwrite) + case _ => + require(!s.isOverwrite, "INSERT OVERWRITE requires " + + s"${classOf[OverwritableTableSink].getSimpleName} but actually got " + + tableSink.getClass.getName) + } + tableSink + } + case o => + throw new TableException("Unsupported Operation: " + o.asSummaryString()) + } + } + + protected def getTableSink(objectIdentifier: ObjectIdentifier): Option[TableSink[_]] = { JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier)) .map(_.getTable) match { case Some(s) if s.isInstanceOf[ConnectorCatalogTable[_, _]] => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala new file mode 100644 index 0000000..f3fd194 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSink.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset + +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.operators.DataSink +import org.apache.flink.table.api.internal.BatchTableEnvImpl +import org.apache.flink.table.plan.nodes.Sink +import org.apache.flink.table.sinks.{BatchTableSink, TableSink} +import org.apache.flink.types.Row + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode + +/** + * A special [[DataSetRel]] which make explain result more pretty. + * + * <p>NOTES: We can't move the [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] logic + * from [[BatchTableEnvImpl]] to this node, because the return types of + * [[DataSetRel#translateToPlan]] (which returns [[DataSet]]) and + * [[BatchTableSink#consumeDataSet]]/[[DataSet#output]] (which returns [[DataSink]]) are + * different. [[DataSetSink#translateToPlan]] just returns the input's translated result. + */ +class DataSetSink( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + sink: TableSink[_], + sinkName: String) + extends Sink(cluster, traitSet, inputRel, sink, sinkName) + with DataSetRel { + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetSink(cluster, traitSet, inputs.get(0), sink, sinkName) + } + + override def translateToPlan(tableEnv: BatchTableEnvImpl): DataSet[Row] = { + getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + } + +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index a872fb4..50a1bca 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -220,7 +220,8 @@ object FlinkRuleSets { DataSetValuesRule.INSTANCE, DataSetCorrelateRule.INSTANCE, DataSetPythonCorrelateRule.INSTANCE, - BatchTableSourceScanRule.INSTANCE + BatchTableSourceScanRule.INSTANCE, + DataSetSinkRule.INSTANCE ) /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala new file mode 100644 index 0000000..b7786b0 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSinkRule.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.dataset.DataSetSink +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule + +class DataSetSinkRule + extends ConverterRule( + classOf[FlinkLogicalSink], + FlinkConventions.LOGICAL, + FlinkConventions.DATASET, + "DataSetSinkRule") { + + def convert(rel: RelNode): RelNode = { + val sink: FlinkLogicalSink = rel.asInstanceOf[FlinkLogicalSink] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET) + val convInput: RelNode = RelOptRule.convert(sink.getInput(0), FlinkConventions.DATASET) + + new DataSetSink( + rel.getCluster, + traitSet, + convInput, + sink.sink, + sink.sinkName + ) + } +} + +object DataSetSinkRule { + val INSTANCE = new DataSetSinkRule +} + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala index a2df711..fc33f8d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala @@ -18,20 +18,22 @@ package org.apache.flink.table.api.batch +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Table import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl -import org.apache.flink.table.utils.TableTestUtil.batchTableNode +import org.apache.flink.table.api.{Table, Types} +import org.apache.flink.table.runtime.utils.CommonTestData +import org.apache.flink.table.utils.MemoryTableSourceSinkUtil +import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, readFromResource, replaceStageId} import org.apache.flink.test.util.MultipleProgramsTestBase + import org.junit.Assert.assertEquals import org.junit._ class ExplainTest extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) { - private val testFilePath = ExplainTest.this.getClass.getResource("/").getFile - @Test def testFilterWithoutExtended(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -41,8 +43,7 @@ class ExplainTest val table = scan.filter($"a" % 2 === 0) val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter0.out").mkString + val source = readFromResource("testFilter0.out") val expected = replaceString(source, scan) assertEquals(expected, result) @@ -58,8 +59,7 @@ class ExplainTest val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl] .explain(table, extended = true).replaceAll("\\r\\n", "\n") - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testFilter1.out").mkString + val source = readFromResource("testFilter1.out") val expected = replaceString(source, scan) assertEquals(expected, result) @@ -75,8 +75,7 @@ class ExplainTest val table = table1.join(table2).where($"b" === $"d").select($"a", $"c") val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin0.out").mkString + val source = readFromResource("testJoin0.out") val expected = replaceString(source, table1, table2) assertEquals(expected, result) @@ -93,8 +92,7 @@ class ExplainTest val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl] .explain(table, extended = true).replaceAll("\\r\\n", "\n") - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testJoin1.out").mkString + val source = readFromResource("testJoin1.out") val expected = replaceString(source, table1, table2) assertEquals(expected, result) @@ -110,8 +108,7 @@ class ExplainTest val table = table1.unionAll(table2) val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion0.out").mkString + val source = readFromResource("testUnion0.out") val expected = replaceString(source, table1, table2) assertEquals(expected, result) @@ -128,13 +125,51 @@ class ExplainTest val result = tEnv.asInstanceOf[BatchTableEnvironmentImpl] .explain(table, extended = true).replaceAll("\\r\\n", "\n") - val source = scala.io.Source.fromFile(testFilePath + - "../../src/test/scala/resources/testUnion1.out").mkString + val source = readFromResource("testUnion1.out") val expected = replaceString(source, table1, table2) assertEquals(expected, result) } + @Test + def testInsert(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = BatchTableEnvironment.create(env) + + tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource) + + val fieldNames = Array("d", "e") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT()) + val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes)) + + tEnv.sqlUpdate("insert into targetTable select first, id from sourceTable") + + val result = tEnv.explain(false) + val expected = readFromResource("testInsert1.out") + assertEquals(replaceStageId(expected), replaceStageId(result)) + } + + @Test + def testMultipleInserts(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = BatchTableEnvironment.create(env) + + tEnv.registerTableSource("sourceTable", CommonTestData.getCsvTableSource) + + val fieldNames = Array("d", "e") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), Types.INT()) + val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink + tEnv.registerTableSink("targetTable1", sink.configure(fieldNames, fieldTypes)) + tEnv.registerTableSink("targetTable2", sink.configure(fieldNames, fieldTypes)) + + tEnv.sqlUpdate("insert into targetTable1 select first, id from sourceTable") + tEnv.sqlUpdate("insert into targetTable2 select last, id from sourceTable") + + val result = tEnv.explain(false) + val expected = readFromResource("testMultipleInserts1.out") + assertEquals(replaceStageId(expected), replaceStageId(result)) + } def replaceString(s: String, t1: Table, t2: Table): String = { replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1) @@ -144,14 +179,14 @@ class ExplainTest replaceSourceNode(replaceString(s), t, 0) } - private def replaceSourceNode(s: String, t: Table, idx: Int) = { + private def replaceSourceNode(s: String, t: Table, idx: Int): String = { s.replace( s"%logicalSourceNode$idx%", batchTableNode(t) .replace("DataSetScan", "FlinkLogicalDataSetScan")) .replace(s"%sourceNode$idx%", batchTableNode(t)) } - def replaceString(s: String) = { + def replaceString(s: String): String = { s.replaceAll("\\r\\n", "\n") } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala index d8915df..ab07039 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala @@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase { // must fail because table sink schema has too few fields util.tableEnv.sqlUpdate(sql) + // trigger validation + util.tableEnv.execute("test") } @Test(expected = classOf[ValidationException]) @@ -57,6 +59,8 @@ class InsertIntoValidationTest extends TableTestBase { // must fail because types of table sink do not match query result util.tableEnv.sqlUpdate(sql) + // trigger validation + util.tableEnv.execute("test") } @Test(expected = classOf[ValidationException]) @@ -73,6 +77,8 @@ class InsertIntoValidationTest extends TableTestBase { // must fail because partial insert is not supported yet. util.tableEnv.sqlUpdate(sql) + // trigger validation + util.tableEnv.execute("test") } @Test @@ -92,5 +98,7 @@ class InsertIntoValidationTest extends TableTestBase { val sql = "INSERT INTO targetTable SELECT a, b FROM sourceTable" util.tableEnv.sqlUpdate(sql) + // trigger validation + util.tableEnv.execute("test") } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala index 4ae77f9..210a598 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala @@ -41,6 +41,8 @@ class InsertIntoValidationTest extends TableTestBase { util.tableEnv.scan("sourceTable") .select('a, 'b, 'c) .insertInto("targetTable") + // trigger validation + util.tableEnv.execute("test") } @Test(expected = classOf[ValidationException]) @@ -57,5 +59,7 @@ class InsertIntoValidationTest extends TableTestBase { util.tableEnv.scan("sourceTable") .select('a, 'b, 'c) .insertInto("targetTable") + // trigger validation + util.tableEnv.execute("test") } } diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out new file mode 100644 index 0000000..9b78a10 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testInsert1.out @@ -0,0 +1,27 @@ +== Abstract Syntax Tree == +LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e]) + LogicalProject(first=[$0], id=[$1]) + LogicalTableScan(table=[[default_catalog, default_database, sourceTable]]) + +== Optimized Logical Plan == +DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`], fields=[d, e]) + BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + : Map + content : to: Row + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + : Data Sink + content : UnsafeMemoryAppendTableSink(d, e) + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out new file mode 100644 index 0000000..c8979bd --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testMultipleInserts1.out @@ -0,0 +1,51 @@ +== Abstract Syntax Tree == +LogicalSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e]) + LogicalProject(first=[$0], id=[$1]) + LogicalTableScan(table=[[default_catalog, default_database, sourceTable]]) + +LogicalSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e]) + LogicalProject(last=[$3], id=[$1]) + LogicalTableScan(table=[[default_catalog, default_database, sourceTable]]) + +== Optimized Logical Plan == +DataSetSink(name=[`default_catalog`.`default_database`.`targetTable1`], fields=[d, e]) + BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, id)]) + +DataSetSink(name=[`default_catalog`.`default_database`.`targetTable2`], fields=[d, e]) + BatchTableSourceScan(table=[[default_catalog, default_database, sourceTable]], fields=[last, id], source=[CsvTableSource(read fields: last, id)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + : Map + content : to: Row + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + : Data Sink + content : UnsafeMemoryAppendTableSink(d, e) + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + + : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + : Map + content : to: Row + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + : Data Sink + content : UnsafeMemoryAppendTableSink(d, e) + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED +