[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r254982253 ## File path: flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala ## @@ -44,12 +44,12 @@ object StreamTableExample { val orderA = env.fromCollection(Seq( Review comment: nit: 2nd commit > [FLINK-8577][table] Rename methods name to toTableFromAppendStream and toTableFromUpsertStream in DataStreamConversions has no code changes related to `toTableFromUpsertStream`. Probably commit should be renamed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r254983164 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,102 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Defining key on append stream has not been supported yet, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + private def getTypeFromUpsertStream[T](dataStream: DataStream[T]): TypeInformation[T] = { +dataStream.getType match { + case c: CaseClassTypeInfo[_] +if (c.getTypeClass.equals(classOf[Tuple2[_, _]])) => c.getTypeAt(1) + case t: TupleTypeInfo[_] +if (t.getTypeClass.equals(classOf[JTuple2[_, _]])) => t.getTypeAt(1) + case _ => +throw new TableException("You can only upsert from a datastream with type of Tuple2!") +} + } + + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { + +val streamType: TypeInformation[T] = getTypeFromUpsertStream(dataStream) + +val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType) +val dataStreamTable = new UpsertStreamTable[T]( + dataStream, + fieldIndexes, + fieldNames +) +registerTableInternal(name, dataStreamTable) + } + + /** +* Registers an upsert [[DataStream]] as a table under a given name with field names as specified +* by field expressions in the [[TableEnvironment]]'s catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @param fields The field expressions to define the field names of the table. +* @tparam T The type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T]( + name: String, + dataStream: DataStream[T], + fields: Array[Expression]) + : Unit = { + +val streamType: TypeInformation[T] = getTypeFromUpsertStream(dataStream) + +// get field names and types for all non-replaced fields +val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields) + +// validate and extract time attributes +val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields) Review comment: I think rowtime columns shouldn't be allowed for upsert streams at the moment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253802557 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Function used to convert upsert to retractions. + * + * @param rowTypeInfo the output row type info. + * @param queryConfig the configuration for the query. + */ +class UpsertToRetractionProcessFunction( Review comment: Ok, but then we can provide `LAST_NULLABLE_VALUE()` function and still re-use aggregation operator code. Even if we keep `LAST_NULLABLE_VALUE()` as an internal function that we do not expose anywhere. I would vote for exposing it (maybe in a follow up PR). My main concern is again code duplication with aggregation operator. Now that I'm thinking about it, did we even need to implement our own UpsertToRetraction rules and RelNodes? Couldn't we re-use `Aggregate` `RelNode` with `LAST_NULLABLE_VALUE()` aggregation functions? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253795843 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( Review comment: Ok, I talked also a little bit more with @twalthr and now I understand why do we need this `switch/case`. Regarding the code duplication, I'm not sure but maybe it could be deduplicated by modifying already existing `convertToInternalRow` and adding two more parameters: 1. `Row getRow(T inputRecord)` 2. `boolean getChange(T inputRecord)` and using those two parameters: ``` new RichMapFunction[T, CRow] { ... override def map(inputRecord: T): CRow = { outCRow.row = getRow(inputRecord) outCRow.change = getChange(inputRecord) outCRow } } ``` and this modified `convertToInternalRow` method could be called three times: 1. for the current use case of append only data stream with `getRow(record) : record`, `getChange(record) : true` 2. for upsert scala: with `getRow(record) : record._2`, `getChange(record) : record._1` 3. for upsert java: with `getRow(record) : record.f1`, `getChange(record) : record.f0` or something along those lines. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253546583 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( Review comment: Couple of remarks regarding this method: 1. Why does it have this `switch/case` for scala/java tuples while the `convertToInternalRow` doesn't? 2. I'm not sure, but It looks like it's duplicating logic from `convertToInternalRow`, maybe they should be deduplicated into one method and passing the `upsert/append` mode as a parameter for a couple of `if` statements? 3. Please split it into smaller methods and you could do the same for `convertToInternalRow`. More or less where ever there is a comment like `// Scala tuple` or `// input is already of correct type. Only need to wrap it as CRow` extract the matching piece of code to a named method like `convertScalaTuple(...)` or `wrapAsCRow(...)` 4. are all of the branches covered by the tests? Like the both branches of `if` under the java tuple? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253778196 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment( converterFunction match { case Some(func) => -new CRowMapRunner[OUT](func.name, func.code, func.returnType) +new CRowToExternalTypeMapRunner[OUT](func.name, func.code, func.returnType) Review comment: You can keep this rename and class moving as a separate `[hotfix]` commit either at the beginning or at the end of this PR or as separate PR :) We do not have to drop those changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253776804 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: They have responded to me and basically there is no contract... as long as the tests are passing we are fine. I don't like it, but if you want we can leave those methods not implemented. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253463228 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment( converterFunction match { case Some(func) => -new CRowMapRunner[OUT](func.name, func.code, func.returnType) +new CRowToExternalTypeMapRunner[OUT](func.name, func.code, func.returnType) Review comment: As a general rule please do not mix simple refactorings like class renames and moving classes around with functional changes. Embedding those two things inside one commit makes sense only if the refactoring/rename is tightly coupled with the functional change and here it doesn't seem so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253559564 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: I'm asking on Calcite dev mailing list, trying to understand those methods. Until we get a clear response from them we can leave this as it is now. `Aggregate` node (which is almost identical from the semantics perspective as this one) also doesn't implement those methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253559564 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: I'm syncing with Calcite dev mailing list, trying to understand those methods. Until we get a clear response from them lets leave this as it is now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253554512 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ## @@ -19,18 +19,49 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.plan.{RelOptCost, RelOptPlanner} +import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMdUtil import org.apache.calcite.rex._ +import org.apache.calcite.sql.SqlKind import org.apache.flink.api.common.functions.Function import org.apache.flink.table.api.TableConfig import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ trait CommonCalc { + /** +* Returns empty if output field is not forwarded from the input for the calc. +*/ + private[flink] def getInputFromOutputName(calc: Calc, outputFieldName: String): Option[String] = { Review comment: Could you add a simple unit test for this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253551875 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Function used to convert upsert to retractions. + * + * @param rowTypeInfo the output row type info. + * @param queryConfig the configuration for the query. + */ +class UpsertToRetractionProcessFunction( Review comment: Why not using `GroupAggProcessFunction` with `LAST_VALUE()` aggregate function? It would allow us to share the logic and adding `LAST_VALUE()` is valuable addition to Flink SQL on it's own. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253546583 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( Review comment: Couple of remarks regarding this method: 1. Why does it have this `switch/case` for scala/java tuples while the `convertToInternalRow` doesn't? 2. I'm not sure, but It looks like it's duplicating logic from `convertToInternalRow`, maybe they should be deduplicated into one method and passing the `upsert/append` mode as a parameter for a couple of `if` statements? 3. Please split it into smaller methods and you could do the same for `convertToInternalRow`. More or less were ever there is a comment like `// Scala tuple` or `// input is already of correct type. Only need to wrap it as CRow` extract the matching piece of code to a named method like `convertScalaTuple(...)` or `wrapAsCRow(...)` 4. are all of the branches covered by the tests? Like the both branches of `if` under the java tuple? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253465887 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( + schema: RowSchema, + input: DataStream[Any], + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataStream[CRow] = { + +val internalType = schema.typeInfo +val cRowType = CRowTypeInfo(internalType) + +val hasTimeIndicator = fieldIdxs.exists(f => + f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || +f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) + +val dsType = input.getType + +dsType match { +// Scala tuple + case t: CaseClassTypeInfo[_] +if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => + +val inputType = t.getTypeAt[Any](1) +if (inputType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[(Boolean, Row)]] +.map(new RichMapFunction[(Boolean, Row), CRow] { + @transient private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = { +outCRow = new CRow(null, change = true) + } + + override def map(v: (Boolean, Row)): CRow = { +outCRow.row = v._2 +outCRow.change = v._1 +outCRow + } +}).returns(cRowType) + +} else { + // input needs to be converted and wrapped as CRow or time indicators need to be generated + + val function = generateConversionProcessFunction( +config, +inputType.asInstanceOf[TypeInformation[Any]], +internalType, +"UpsertStreamSourceConversion", +schema.fieldNames, +fieldIdxs, +rowtimeExpression + ) + + val processFunc = new ScalaTupleToCRowProcessRunner( +function.name, +function.code, +cRowType) + + val opName = s"from: (${schema.fieldNames.mkString(", ")})" + + input +.asInstanceOf[DataStream[(Boolean, Any)]] +.process(processFunc).name(opName).returns(cRowType) +} + + // Java tuple + case t: TupleTypeInfo[_] +if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN => + +val inputType = t.getTypeAt[Any](1) +if (inputType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[JTuple2[JBool, Row]]] +.map(new RichMapFunction[JTuple2[JBool, Row], CRow] { + @transient private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = { +outCRow = new CRow(null, change = true) + } + + override def map(v: JTuple2[JBool, Row]): CRow = { +outCRow.row = v.f1 +outCRow.change = v.f0 +outCRow + } +}).returns(cRowType) + +} else { + // input needs to be converted and wrapped as CRow or time indicators
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253468143 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( + schema: RowSchema, + input: DataStream[Any], + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataStream[CRow] = { + +val internalType = schema.typeInfo +val cRowType = CRowTypeInfo(internalType) + +val hasTimeIndicator = fieldIdxs.exists(f => Review comment: rename `f` -> `fieldIndex` `fieldIdxs` -> `fieldIndexes` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253463228 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment( converterFunction match { case Some(func) => -new CRowMapRunner[OUT](func.name, func.code, func.returnType) +new CRowToExternalTypeMapRunner[OUT](func.name, func.code, func.returnType) Review comment: Please, as a general rule do not mix simple refactorings like class renames and moving classes around with functional changes. Embedding those two things inside one commit makes sense only if the refactoring/rename is tightly coupled with the functional change and here it doesn't seem so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253557485 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] +call.transformTo(getNewUpsertToRetraction(calc, upsertToRetraction)) + } + + private def getNewUpsertToRetraction( +calc: FlinkLogicalCalc, +upsertToRetraction: FlinkLogicalUpsertToRetraction): FlinkLogicalUpsertToRetraction = { + +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) + +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) +new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +calc.getRowType.getFieldNames + .flatMap(calc.getInputFromOutputName(calc, _)) Review comment: [newbie scala question] How does it work? `flatMap` here is unwrapping `Option` returned from `getInputFromOutputName`? If `Option` is empty, the result list will have null, will it be empty or will it throw an exception? (two first options are acceptable ;) ) [/newbie scala question] This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r252326504 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: `FlinkLogicalUpsertToRetraction` has hidden `RexNode` reference - in form of `keyNames`. I think currently calcite is only using `accept()` or `getChildExps()` for decorrelating the plan. So currently there is no problem, because `FlinkLogicalUpsertToRetraction.keyNames` should never have anything to do with corellated subqueries. But my point is that if someone would use those methods for something else, like renaming variables, looking which fields are referenced, etc..., it would brake very silently and the failure would be very hard to debug. Look at the all of the implementations of `org.apache.calcite.rel.RelNode#accept(org.apache.calcite.rex.RexShuttle)`. Basically every node that has some some input references (or other expressions), is implementing this method. Unfortunately you might be right, that throwing an exception will not work here :/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r251924624 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: I think that this comment of mine might be partially invalid. The only optimisation rule that we support around this logical node is `CalcUpsertToRetractionTransposeRule` which doesn't use this accept. But I guess for the sake of the consistency, it should be implemented one way or another, for example by simply `throw new UnsupportedOperationException()`. Otherwise this is a "silent" land mine waiting for someone to step in? Or am I missing something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r251922093 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate +// UpsertToRetractionConverter after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: As far as I can see `FlinkLogicalNativeTableScan` is the same to what I proposed in the comment above, right? The only difference is `estimateRowSize(child.getRowType)` vs `estimateRowSize(getRowType)`, which probably both are equivalent in this case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249795484 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate +// UpsertToRetractionConverter after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using `getFieldCount`? Also Isn't the io cost missing? To be consistent with join cost this should return: ``` planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(child.getRowType)) ``` edit: probably a better idea is to use `estimateRowSize` and maybe in the future update it to use `RelMetadataQuery#getAverageRowSize()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249797875 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: Isn't this class missing a proper implementation of `AbstractRelNode#accept(org.apache.calcite.rex.RexShuttle)` method? It is used by calcite for example for gathering the "used" columns by this node. If `FlinkLogicalUpsertToRetraction#accept` do not visit primary key, I would be afraid that primary key might be pruned from the input of `FlinkLogicalUpsertToRetraction`. Also it is used for renaming fields etc. After optimisation your `keyNames: Seq[String]` can be not valid any more. As far as I (vaguely) recall from when I was implementing Temporal Table Joins, storing fields in form of `String` references was not very useful and using `RexNode` was a better option. Probably this needs more investigation and some additional unit test(s). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249795484 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate +// UpsertToRetractionConverter after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using `getFieldCount`? Also Isn't the io cost missing? To be consistent with join cost this should return: ``` planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(child.getRowType)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249790833 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: Yes, that was my point. If you drop the first condition it will also simplify this test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249788943 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: Also ditto :) I think here you shouldn't care if the field is duplicated or not, so if you decide not to use `mapInputToOutputName` method, implementing `CommonCalc#getInputToOutputNamesMapping()` that return a map or multimap is probably a better option then `Seq[String]`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249790503 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { +calc.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case r: RexInputRef => (r.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => +a.getOperands.get(0) match { + case ref: RexInputRef => +(ref.getIndex, p.right) + case _ => +(-1, p.right) Review comment: What do you mean? Maybe to rephrase my comment: I think that instead of having this `match case` construct a better idea might to implement it's logic as `RexVisitorImpl`, probably as a private static nested class inside `FlinkLogicalCalc` or `CommonCalc`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249786791 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: Good point with `CommonCalc`. Probably it's a better place (btw, doesn't Calcite have some util method for that that works on `Calc`?). Regarding `getInputFromOutputName`. I don't mind which method you would add, both `getInputFromOutputName` and `mapInputToOutputNames` are valuable addition. However will the `getInputFromOutputName` help you? Don't you need the mapping in the opposite direction? With `calc` following the `UpsertToRetraction` node, you want to convert key names after transposing those two nodes. So you need to perform an operation like this: ``` val keyNameAfterTransposition = calc.mapInputToOutputName(upsertToRetraction.key) ``` or more precisely: ``` val newKeys = upsertToRetraction.keyNames.map(calc.mapInputToOutputName) // plus code to checkstate that `Option` is present ``` Right? Secondly: > A field may contain multi-output. For example, `select key as key1, key as key2 from T` 1. Does it matter? I guess one possibility would be to return the first encountered of those fields 2. `Seq[String] mapInputToOutputNames(String input)` is also an option. It might be more general. Even you could implement: ``` Option[String] mapInputToOutputName(String input) { return mapInputToOutputNames.collectFirst(input); } ``` This is an automated
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249726595 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) Review comment: Either rename the method or drop the `// key fields should not be changed` comment. Usually something is wrong if you need to write a comment explaining method name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249729097 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249729279 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: `getInputToOutputNamesMapping` and drop the comment? This method seems more generic and that it could be used in other places other then this rule `CalcUpsertToRetractionTransposeRule ` here. What do you think about placing it inside `FlinkLogicalCalc`? Maybe even refactoring it into something like this: ``` class FlinkLogicalCalc: /** * Returns empty if field is not forwarded. */ Option[String] mapInputToOutputName(String input) ``` `Seq[Option[String]] mapInputToOutputNames(Seq[String] inputs)` would be probably more efficient, but maybe less readable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249735236 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: Some kind of `map` usually better expresses mappings between two things compared to list of tuples. Is there a reason that I do not see why have you chosen list of tuples? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249728531 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc Review comment: drop this comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249735123 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { +calc.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case r: RexInputRef => (r.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => +a.getOperands.get(0) match { + case ref: RexInputRef => +(ref.getIndex, p.right) + case _ => +(-1, p.right) Review comment: Why do you create a tuples `(int, string)` and later filtering out based on integers instead of for example inserting or not something to an array or a map? Another remark, I think that maybe implementing `RexVisitorImpl` (like `InputRefVisitor`) would be a better, more re-usable approach which would for example solve the problem of duplicating the code for `case r: RexInputRef`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249729068 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names Review comment: drop comment it duplicates method name This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249731317 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { Review comment: Please do not use single letter/abbreviated variables `io`,`p`, `r`, `a`, `ref`, it makes code cryptic. Also naming what `p.right` and `p.left` means by assigning those values to a named local variable would be useful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249728818 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction Review comment: Extract to method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249726595 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) Review comment: Either rename the method or drop the `// key fields should not be changed` comment. Something is wrong if you need to write a comment explaining method name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249727908 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && Review comment: However I see one more issue here. Does this condition make sense? Shouldn't this be resolved/decided by cost base optimiser? Why do we need this heuristic? What if calc adds column(s) while also being a very selective filter? IMO this condition should be completely dropped and only conditions that guards the correctness should be tested here. Whether `calc` should or shouldn't be pushed down through `UpsertToRetraction` should be decided by the cost of the plan. Cost of `UpsertToRetraction` should reflect both the number of rows and size of the rows. Is this happening? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249725951 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: Generally speaking naming of the those tests is not perfect (I'm really struggling to understand what are they doing), but I first wanted to understand what are you testing for. But `testCalcTransposeUpsertToRetraction` is a subset of `testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose`. They both test exact same code paths/branches, aren't they? On the other hand you are not testing for the condition: ``` calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount ``` The last test checks only for the: ``` // key fields should not be changed fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249728417 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down Review comment: Same here. Instead of writing a comment explaining `calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount` encapsulate this logic inside a simple method which name would explain it. And ditto in other places. There are comments that do not explain anything or comments that should be replaced by extracted method names. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249056854 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { +val input = upsertToRetraction.getInput.accept(this) +val relTypes = input.getRowType.getFieldList.map(_.getType) +val timeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2).toSet + +val rewrittenInput = input.copy(input.getTraitSet, input.getInputs) Review comment: `input` is already a `rewrittenInput`. `val rewrittenInput = input.copy(input.getTraitSet, input.getInputs)` looks to me as a no-op: 1. rename `input` -> `rewrittenInput` 2. drop copy? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249065571 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala ## @@ -183,6 +183,17 @@ object UpdatingPlanChecker { lJoinKeys.zip(rJoinKeys) ) } + +case l: DataStreamUpsertToRetraction => + val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex +.filter(e => l.keyIndexes.contains(e._2)) +.map(_._1) + Some(uniqueKeyNames.map(e => (e, e))) + +case scan: UpsertStreamScan => Review comment: Let's leave it as it is now, but we could implement our own `DataStreamRelShuttle` (equivalent of `RelShuttle`) that would work on all of our `DataStream*` nodes (and adding appropriate `visit(DataStreamRelShuttle)` methods to all of our `DataStream*` nodes. I once wanted to write something like this already some time ago to solve some other problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r24905 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/UpsertStreamScan.scala ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex.RexNode +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.expressions.Cast +import org.apache.flink.table.plan.schema.{RowSchema, UpsertStreamTable} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +/** + * Flink RelNode which matches along with DataStreamSource. Different from [[AppendStreamScan]], + * [[UpsertStreamScan]] is used to handle upsert streams from source. + */ +class UpsertStreamScan( Review comment: Probably - as you prefer. (I think it looks good now) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249072493 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: But why is it possible to push down the calc in one and not in the other? Also it's not very for me how/why those two tests (and results) differ from `testCalcTransposeUpsertToRetraction`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249056249 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { Review comment: Generally speaking I would prefer it to happen in this PR (it could be implemented as a first commit of this PR), since otherwise we are agreeing on merging an instant technological debt. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249057349 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { +val input = upsertToRetraction.getInput.accept(this) +val relTypes = input.getRowType.getFieldList.map(_.getType) +val timeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2).toSet + +val rewrittenInput = input.copy(input.getTraitSet, input.getInputs) +val newInput = + materializerUtils.projectAndMaterializeFields(rewrittenInput, timeIndicatorIndexes) Review comment: you could extract the lines 222 and 223 to: ``` val materializedInput = materializerUtils.projectAndMaterializeAllFields(rewrittenInput) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247926320 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { Review comment: I think this code is a bit incorrect. First, you didn't recursively call the `RelTimeIndicatorConverter` on the `upsertToRetraction` input: ``` val rewrittenInput = upsertToRetraction.accept(this) ``` Secondly, we should solve this in some more generic way. Not only `upsertToRetraction` nodes needs to materialize it's output, but basically every node that produces updates/retractions needs to do that. I would propose to introduce a common logic to do that, maybe as a separate `RelShuttle` that we would execute at the end of the planning (probably by checking `DataStreamRel#producesUpdates` flag). What do you think? Also this requires some tests coverage (that `rowtTime` was materialised). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247962668 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala ## @@ -95,4 +95,41 @@ object StreamTestData { data.+=(((3, 3), "three")) env.fromCollection(data) } + + def getSmall3TupleUpsertStream(env: StreamExecutionEnvironment): + DataStream[(Boolean, (Int, Long, String))] = { +val data = new mutable.MutableList[(Boolean, (Int, Long, String))] +data.+=((true, (1, 1L, "Hi"))) +data.+=((true, (2, 2L, "Hello"))) +data.+=((true, (3, 2L, "Hello world"))) +env.fromCollection(data) + } + + def get3TupleUpsertStream(env: StreamExecutionEnvironment): Review comment: Is this method used somewhere? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247961118 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT b as b1, c, proctime as proctime1, rowtime as rowtime1 FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamUpsertToRetraction", + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ), + term("keys", "b"), + term("select", "a", "b", "c", "proctime", "rowtime") +), +term("select", "b AS b1", "c", "proctime AS proctime1", "rowtime AS rowtime1")) +streamUtil.verifySql(sql, expected, true) + } + + @Test + def testCalcCannotTransposeUpsertToRetraction() = { Review comment: What does this test case test? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247952668 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/UpsertStreamScan.scala ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex.RexNode +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.expressions.Cast +import org.apache.flink.table.plan.schema.{RowSchema, UpsertStreamTable} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +/** + * Flink RelNode which matches along with DataStreamSource. Different from [[AppendStreamScan]], + * [[UpsertStreamScan]] is used to handle upsert streams from source. + */ +class UpsertStreamScan( Review comment: This class duplicates code with `AppendStreamScan`. Either we should introduce a common base class or maybe `UpsertStreamScan` can extend from `AppendStreamScan`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247954067 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala ## @@ -183,6 +183,17 @@ object UpdatingPlanChecker { lJoinKeys.zip(rJoinKeys) ) } + +case l: DataStreamUpsertToRetraction => Review comment: please no one letter variable names (`l`, `e` etc) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247934064 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamUpsertToRetraction( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamUpsertToRetraction( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"UpsertToRetractionConverter(${ + if (keyNames.nonEmpty) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) + +val needRetraction = DataStreamRetractionRules.isAccRetract(this) + +if (!needRetraction) { Review comment: Just `checkState(DataStreamRetractionRules.isAccRetract(this))`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247960761 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: Why does the results of `testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose` and `testCalcCannotTransposeUpsertToRetraction` differ? I would expect all of them to have the same basic plan structure/same plan tree % returned columns. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247962174 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala ## @@ -46,6 +46,20 @@ class TableSinkValidationTest extends TableTestBase { env.execute() } + @Test(expected = classOf[TableException]) + def testAppendSinkOnUpdatingTable2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val t = tEnv.fromUpsertStream( + StreamTestData.getSmall3TupleUpsertStream(env), 'id.key, 'num, 'text) + +t.writeToSink(new TestAppendSink) + +// must fail because table is not append-only +env.execute() Review comment: Isn't `tEnv.explain(t)` enough? Do we need full `ITCase` for that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247926320 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { Review comment: I think this code is incorrect. First, you didn't recursively call the `RelTimeIndicatorConverter` on the `upsertToRetraction` input: ``` val rewrittenInput = upsertToRetraction.accept(this) ``` Secondly, `LogicalUpsertToRetraction` doesn't have to materialize any fields: it doesn't invalidate watermarks guarantees (all records after the watermark have a rowtime value above the watermark). In other words, it preserves rowtime fields/watermark guarantees from it's input. A rowtime field must be materialized if this contract between rowtime & watermark is violated, like for example in the non windowed joins (rowtime fields in the join result can be older than the watermark). For `LogicalUpsertToRetraction` that's not the case, isn't it? If I'm correct (and please correct me if I'm wrong), the code should look just like this: ``` def visit(...): val rewrittenInput = upsertToRetraction.accept(this) return upsertToRetraction.copy(upsertToRetraction.getTraitSet, Seq(rewrittenInput)) ``` I think the second issues would be caught by a test that uses time windowed join/aggregation on the upsert source (this should work, but in the current version of this PR I would expect it to fail). ``` SELECT key, max(value) FROM UpsertTable GROUP BY TUMBLE(rowTime1, INTERVAL '1' DAY), key ``` First issue is probably difficult to test/trigger since at the moment when `RelTimeIndicatorConverter` is executed, `LogicalUpsertToRetraction` is always just after the `TableScan` node. However if we changed the order of `CalcUpsertToRetractionTransposeRule` and `RelTimeIndicatorConverter` current code would fail to materialize fields for queries like: ``` SELECT key, max(value) FROM ( SELECT rowTime1 + 1 as rowTime2, key, value FROM UpsertTable) GROUP BY TUMBLE(rowTime2, INTERVAL '1' DAY), key ``` Even if I'm wrong, could you add those two unit tests? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247935469 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamUpsertToRetraction( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamUpsertToRetraction( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"UpsertToRetractionConverter(${ + if (keyNames.nonEmpty) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) Review comment: nitty nit: you do not need those variables in this commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247933734 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamUpsertToRetraction( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamUpsertToRetraction( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"UpsertToRetractionConverter(${ + if (keyNames.nonEmpty) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) + +val needRetraction = DataStreamRetractionRules.isAccRetract(this) + +if (!needRetraction) { + throw new TableException("DataStreamUpsertToRetraction should always generate retractions, " + +"this should be a bug!") +} + +// todo: return inputDS for plan test. The detailed code will be ready in the next commit Review comment: `throw new NotImplementedError()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247935469 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamUpsertToRetraction( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamUpsertToRetraction( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"UpsertToRetractionConverter(${ + if (keyNames.nonEmpty) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) Review comment: nitty nit: you do not need those variables in this commit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247904188 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -808,8 +910,10 @@ abstract class StreamTableEnvironment( val convSubQueryPlan = optimizeConvertSubQueries(relNode) val expandedPlan = optimizeExpandPlan(convSubQueryPlan) val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan) +val upsertToRetractPlan = Review comment: `planWithConvertedUpsertToRetraction`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247954694 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala ## @@ -183,6 +183,17 @@ object UpdatingPlanChecker { lJoinKeys.zip(rJoinKeys) ) } + +case l: DataStreamUpsertToRetraction => + val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex +.filter(e => l.keyIndexes.contains(e._2)) +.map(_._1) + Some(uniqueKeyNames.map(e => (e, e))) + +case scan: UpsertStreamScan => Review comment: This big `match` `case` looks veerryy strange... Why isn't it solved in some more object oriented way like via a visitor pattern? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247955886 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -170,6 +170,19 @@ class StreamTableEnvironmentTest extends TableTestBase { jTEnv.fromAppendStream(ds, "rt.rowtime, b, c, d, e, pt.proctime") } + @Test + def testAddTableFromUpsert(): Unit = { Review comment: what does this test check/asserts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247957706 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase { (jTEnv, ds) } + private def prepareKeyedSchemaExpressionParser: +(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong]]]) = { + +val jStreamExecEnv = mock(classOf[JStreamExecEnv]) Review comment: Do not use mockito here, please implement a proper mock or use actual implementation. There are already at least couple of dummy mocks for this class in our code base - you could deduplicate them and put one `DummyStreamExecutionEnvironment` test jar of `flink-streaming-java` or something along those lines. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247958035 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala ## @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase { (jTEnv, ds) } + private def prepareKeyedSchemaExpressionParser: +(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong]]]) = { + +val jStreamExecEnv = mock(classOf[JStreamExecEnv]) + when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime) +val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv) + +val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG) + .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]] +val dsType = new TupleTypeInfo(Types.BOOLEAN, sType) + .asInstanceOf[TupleTypeInfo[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong +val ds = mock(classOf[DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, JInt, JLong) +when(ds.getType).thenReturn(dsType) Review comment: ditto about the mockito and one abbreviated variables (`sType` vs `dsType`?) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247932397 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -357,7 +358,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { // function call must always be at the end suffixFunctionCall | suffixFunctionCallOneArg | // rowtime or proctime -timeIndicator +timeIndicator | key Review comment: `key` in a new line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r246322448 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalToEnumerableTableScan.scala ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.logical.LogicalTableScan + +/** + * Rule that converts a LogicalTableScan into an EnumerableTableScan. We need this rule to Review comment: If calling a rule only once is impossible, I would be in favour of implementing a separate step similar to `RelTimeIndicatorConverter`. Current approach is a bit hacky: 1. converting back and forth `EnumerableTableScan` & `LogicalTableScan` is not clean on it's own 2. it interconnects with already existing `EnumerableToLogicalTableScan` which is a kind of hack on it's own. 3. `EnumerableToLogicalTableScan` previously had a different purpose and it's better to have two smaller independent components (old `EnumerableToLogicalTableScan` & new `UpsertToRetractionConverter` (?)) that have nothing to do with another, then squeezing them together. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241816667 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala ## @@ -38,7 +41,25 @@ class EnumerableToLogicalTableScan( val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan] val table = oldRel.getTable val newRel = LogicalTableScan.create(oldRel.getCluster, table) -call.transformTo(newRel) + +val streamTable = table.unwrap(classOf[UpsertStreamTable[_]]) +val isUpsertTable = streamTable match { + case _: UpsertStreamTable[_] => +true + case _ => +false +} + +if (isUpsertTable) { Review comment: you could merge this if check with match above to simplify code a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241811123 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: ``` Methods in DataStreamConversions: toTableFromAppendStream toTableFromUpsertStream toTableFromRetractStream ``` I think this makes sense. @twalthr do you think as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241820886 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala ## @@ -65,21 +64,6 @@ class ExplainTest extends AbstractTestBase { assertEquals(expect, result) } - @Test Review comment: Why have you removed this test? Was it moved/superseded by something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241817741 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala ## @@ -22,13 +22,16 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan import org.apache.calcite.plan.RelOptRule.{any, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.schema.UpsertStreamTable /** * Rule that converts an EnumerableTableScan into a LogicalTableScan. * We need this rule because Calcite creates an EnumerableTableScan * when parsing a SQL query. We convert it into a LogicalTableScan * so we can merge the optimization process with any plan that might be created - * by the Table API. + * by the Table API. The rule also checks whether the source is an upsert source and adds Review comment: This rule is only applied on the `EnumerableTableScan` nodes. This comment suggests, that `EnumerableTableScan` appears only in sql, so something is wrong here. If we want to keep it as it is now, the comment need updating. (please check also my comment below in `LogicalToEnumerableTableScan`) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241820550 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalToEnumerableTableScan.scala ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.logical.LogicalTableScan + +/** + * Rule that converts a LogicalTableScan into an EnumerableTableScan. We need this rule to Review comment: What's the problem? That we can not force calcite to fire a rule only once? Like I would expect to have a single pass rule, that's fired only once (around expand plan optimisation phase), that goes through the plan, checks if the sources is upsert and if so, inserts `LogicalUpsertToRetraction`. Alternatively, we can think about doing this one step earlier. Like in Table API somebody is creating a `LogicalTableScan`, right? We could inject logic that checks if we need `LogicalUpsertToRetraction` or not there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241807984 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -573,6 +573,17 @@ abstract class StreamTableEnvironment( registerTableInternal(name, dataStreamTable) } + def getTypeFromUpsertStream[T](dataStream: DataStream[T]): TypeInformation[T] = { Review comment: private? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241809269 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ## @@ -224,14 +226,14 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) { override def toString: String = s"proctime($child)" } -case class Key(child: Expression) extends UnaryExpression { - override private[flink] def resultType: TypeInformation[_] = child.resultType - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -throw new UnsupportedOperationException( - s"Key Expression can only be used during table initialization.") - } -} +//case class Key(child: Expression) extends UnaryExpression { Review comment: Some left overs to remove? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241815618 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/RemoveDataStreamUpsertToRetractionRule.scala ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.flink.table.plan.nodes.datastream.{AccMode, DataStreamUpsertToRetraction} + +/** + * Rule to remove [[DataStreamUpsertToRetraction]] under [[AccMode]]. In this case, it is a no-op Review comment: `under [[AccMode.AccRetract]].`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237008957 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: (I've to this responded above) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237004073 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: Generally speaking `toTable`, `toTableFromUpsertStream`, `toTableFromRetractStream` sounds good to me, but what's the difference between `toTableFromRetractStream` vs `toTable`? Also secondly, would `toTableFromRetractStream` call `tableEnv.fromAppendStream()`? If so, there would be one another naming inconsistency. Maybe renaming `fromDataStream ` to `fromAppendStream` was a bit incorrect? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237018239 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala ## @@ -36,25 +36,32 @@ class DataStreamScanRule override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalNativeTableScan = call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan] -val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]]) -dataSetTable match { - case _: DataStreamTable[Any] => -true - case _ => -false -} +val appendTable = scan.getTable.unwrap(classOf[AppendStreamTable[Any]]) +val upsertTable = scan.getTable.unwrap(classOf[UpsertStreamTable[Any]]) + +appendTable != null || upsertTable != null Review comment: can we avoid those nulls (in multiple places)? For example by using `isInstanceOf` or match? Handling null is dangerous and error prone. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237017454 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalLastRow +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalLastRow( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate LastRow +// after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: as mentioned above (in the no-op/renaming discussion), this cost computation is not/will not be correct in case of no-op This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237008338 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +if (upsertStreamTable != null) { + val relTypes = scan.getRowType.getFieldList.map(_.getType) + val timeIndicatorIndexes = relTypes.zipWithIndex +.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) +.map(_._2) + val input = if (timeIndicatorIndexes.nonEmpty) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, timeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: I have revisited our previous discussion in the design/google doc and I still have the same concern as I had then: > In that case this is strange. It's like having filter/projection/aggregation nodes that after pruning/removing/pushing them down are sometimes NO-OPs and sometimes not. Besides being strange it can have some negative side effects: > - presence of a no-op node (when it doesn't need to be there) can block/mess with other optimisations or make them more complex > - it will pollute printed explain plan to the user. This is important, since this will be very costly node and it will be very hard to explain users when this node requires huge state and when not > - it would make cost computations more complicated Back then it sparked a discussion, where you were saying that it will never be a no-op and you wanted to `LastRow` always have a state and purpose besides upsert to retraction conversion (filtering out empty deletes). However that was resolved and filtering out empty deletes was dropped. Thus it brings me back to the issue of having this as a no-op node in the plan. This connects with your other response regarding naming `LastRow` > The LastRow node may be a no-op node in the case such as upsert source -> calc -> upsert sink. While LastRow will convert upsert stream to retract stream if a downstream node needs it to, such as upsert source -> calc -> retract sink. Whether convert to retract stream will be decided by RetractionRules. I would still argue that it should be named `UpsertToRetractionConverter` (or sth along those lines) and if not needed, it should be not in the plan. Maybe this means that our `RetractionRules` are not sufficient and needs some refactoring. I could see couple of solutions to that, but probably the best would be to deduce whether we need to insert `UpsertToRetractionConverter` or not inside the rule that is supposed to create it. Further optimisations/rewrites would have to correctly handle/preserve semantic/trait of upserts vs retractions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236606838 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: > We can't name it UpsertToRetractionsConverter since we don't always convert upsert to retractions in this node. can you elaborate on that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236606505 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key + + lazy val key: PackratParser[Expression] = +(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY ^^ { Review comment: Yes, I meant that this `(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference)` is duplicated couple of times here. If it is possible to deduplicate it to: ``` lazy val aliasOrFieldReference = aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference ``` I think it would be better to do so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236601205 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") Review comment: I meant that this check doesn't prevent from any bugs or from any other exceptions. It's just to block user from specifying a no-op key definition, right? That's why I'm not sure if we should block it. If we decide to keep this check/exception, I would refrain from naming `fromUpsertStream` in the exception. Maybe rephrasing it to: > Defining key on append stream do not have any effects ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236601884 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { Review comment: Ok, I get it - upsert streams without a primary key are single row tables. But what do you mean by: > The primary key would be necessary. ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236604746 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: Maybe `upsertStreamToTable`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236597321 ## File path: flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ## @@ -54,9 +54,9 @@ object StreamSQLExample { Order(4L, "beer", 1))) // convert DataStream to Table -var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount) Review comment: I would prefer to document this rename in this PR, but if you want it all do together with upsert sources documentation then OK, but let's not forget about it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236605473 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second Review comment: The messages in the table will be encoded as `Tuple2`? Or the incoming messages from the source `DataStream` are expected to be encoded as `Tuple2`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236196242 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +if (upsertStreamTable != null) { + val relTypes = scan.getRowType.getFieldList.map(_.getType) + val timeIndicatorIndexes = relTypes.zipWithIndex +.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) +.map(_._2) + val input = if (timeIndicatorIndexes.nonEmpty) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, timeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: Do we ALWAYS convert upserts to retractions? Even for pipelines `upsert source -> filter -> upsert sink` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236197534 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key + + lazy val key: PackratParser[Expression] = +(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY ^^ { Review comment: can we deduplicate `(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference)`? I would expect so but I'm not familiar with this scala magic `ExpressionParser`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236202082 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: I still think this is a bad and misleading name for this operator. `LastRow` doesn't explain neither the purpose, nor the actual thing that it's doing. If someone was asked without prior knowledge what `LastRow` RelNode/operator does, it would be impossible to guess the correct answer. It's even more confusing when you are binding `LastRow` with "upsert" name in the java doc: > Represent an upsert source. > The upsert key names. Please either change the name to something like `UpsertToRetractionsConverter` that explicitly states the purpose of this class (than you can keep `upsert` references in the java docs) or if you would like to keep naming by the thing that it does it would have to meet the following requirements: - `LastRow` is independent of the upsert -> retraction conversion logic - there is a reasonable chance that it will be used somewhere else outside of the upsert -> retraction conversion context It's a bit like we would rename `LogicalJoin` to `HashTable`. Kind of true but misleading. It could make sense if: - `HashTable` is extracted to separate class - `LogicalJoin` relnode/operator/concept still exists, but is just using `HashTable` - `HashTable` is reused somewhere else This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236177333 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: TableConfig) { referenceByName(name, p).map((_, name)) case Alias(UnresolvedFieldReference(origName), name: String, _) => referenceByName(origName, p).map((_, name)) + case (Key(UnresolvedFieldReference(name: String))) => Review comment: Tbh, at this moment I'm not sure if adding `Key` as nested case class was a good idea. It adds quite a lot of boiler plate and special handling in multiple places, which contradicts a clean code. I wonder if `UnresolvedFieldReference` shouldn't have some kind of "trait" that's a primary key. Where by a "trait" I mean any way to determine if that's a primary key or not. For example either some boolean flag or maybe introducing an `UnresolvedKeyFieldReference` that extends from `UnresolvedFieldReference`? I think both of those solutions would allow avoid this code/handling duplication in most of the places and special handle `Key` only where it is necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236197832 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/AppendStreamScan.scala ## @@ -36,20 +36,20 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo * It ensures that types without deterministic field order (e.g. POJOs) are not part of * the plan translation. */ -class DataStreamScan( +class AppendStreamScan( Review comment: Shouldn't this belong to the previous commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236202531 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. Review comment: if `keyNames` are "upsert key names" then just name it `upsertKeyNames` and drop the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236168761 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { Review comment: aren't we missing here the user defined primary key? What's the value of upsert stream without a defined primary key? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236174343 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1089,6 +1089,19 @@ abstract class TableEnvironment(val config: TableConfig) { } else { referenceByName(origName, t).map((_, name)) } + case (Key(UnresolvedFieldReference(name: String)), idx) => +if (isRefByPos) { Review comment: Please deduplicate this if and one if below, with non keyed versions. Either extract those if's to separate functions or convert whole match into a function sth like this: ``` foo(expr, index) { match expr { case Key(keyExpr) => foo(keyExpr, index) case UnresolvedFieldReference(...) => ... case Alias(UnresolvedFieldReference(...)) => ... } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236196723 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key Review comment: nit: this comment is not helpful it just duplicates the field name This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236195894 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, getTimeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: But you could probably add this node when changing convention. We assume that `TabelScan` rel node represents all kinds of table scans and it always can produce retractions, while for some stage (`FlinkLogical`?) we freeze it to either retractions/upserts. Regardless of that, `RelTimeIndicatorConverter.scala` is not the right place to create `LogicalLastRow`. You could also add a single pass rule, that is applied only once, that inserts upsert -> retraction conversion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236177806 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1112,13 +1129,17 @@ abstract class TableEnvironment(val config: TableConfig) { exprs flatMap { case _: TimeAttribute => None - case UnresolvedFieldReference(_) if referenced => + case UnresolvedFieldReference(_) | Key(UnresolvedFieldReference(_)) if referenced => // only accept the first field for an atomic type throw new TableException("Only the first field can reference an atomic type.") case UnresolvedFieldReference(name: String) => referenced = true // first field reference is mapped to atomic type Some((0, name)) + case Key(UnresolvedFieldReference(name: String)) => +referenced = true Review comment: ditto: another code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236169075 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { + +val streamType: TypeInformation[T] = dataStream.getType match { Review comment: lines L586:L593 seems to be duplicated with L619:L626 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236198600 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. Review comment: ? I think this comment is incorrect. This is not the source on it's own, but only a conversion class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236187717 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: This is some naming inconsistency: `toKeyedTable` vs `fromUpsertStream`. Rename `keyed` to `upsert`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236171634 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") Review comment: I'm not sure if we need/want this check. For example for Temporal Joins, user might want to interpret upsert stream as an `AppendStreamTable`: https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit#heading=h.yqq0cgo927fp also there might be other use cases where specifing primary key on an `AppendTable` makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services