[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-08 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r254982253
 
 

 ##
 File path: 
flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
 ##
 @@ -44,12 +44,12 @@ object StreamTableExample {
 val orderA = env.fromCollection(Seq(
 
 Review comment:
   nit: 2nd commit
   > [FLINK-8577][table] Rename methods name to toTableFromAppendStream and 
toTableFromUpsertStream in DataStreamConversions
   
   has no code changes related to `toTableFromUpsertStream`. Probably commit 
should be renamed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-08 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r254983164
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,102 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Defining key on append stream has not been supported yet, use 
fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  private def getTypeFromUpsertStream[T](dataStream: DataStream[T]): 
TypeInformation[T] = {
+dataStream.getType match {
+  case c: CaseClassTypeInfo[_]
+if (c.getTypeClass.equals(classOf[Tuple2[_, _]])) => c.getTypeAt(1)
+  case t: TupleTypeInfo[_]
+if (t.getTypeClass.equals(classOf[JTuple2[_, _]])) => t.getTypeAt(1)
+  case _ =>
+throw new TableException("You can only upsert from a datastream with 
type of Tuple2!")
+}
+  }
+
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
+
+val streamType: TypeInformation[T] = getTypeFromUpsertStream(dataStream)
+
+val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType)
+val dataStreamTable = new UpsertStreamTable[T](
+  dataStream,
+  fieldIndexes,
+  fieldNames
+)
+registerTableInternal(name, dataStreamTable)
+  }
+
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name with 
field names as specified
+* by field expressions in the [[TableEnvironment]]'s catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @param fields The field expressions to define the field names of the 
table.
+* @tparam T The type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](
+  name: String,
+  dataStream: DataStream[T],
+  fields: Array[Expression])
+  : Unit = {
+
+val streamType: TypeInformation[T] = getTypeFromUpsertStream(dataStream)
+
+// get field names and types for all non-replaced fields
+val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields)
+
+// validate and extract time attributes
+val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, 
fields)
 
 Review comment:
   I think rowtime columns shouldn't be allowed for upsert streams at the 
moment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-05 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253802557
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Function used to convert upsert to retractions.
+  *
+  * @param rowTypeInfo the output row type info.
+  * @param queryConfig the configuration for the query.
+  */
+class UpsertToRetractionProcessFunction(
 
 Review comment:
   Ok, but then we can provide `LAST_NULLABLE_VALUE()` function and still 
re-use aggregation operator code. Even if we keep `LAST_NULLABLE_VALUE()` as an 
internal function that we do not expose anywhere. I would vote for exposing it 
(maybe in a follow up PR).
   
   My main concern is again code duplication with aggregation operator. 
   
   Now that I'm thinking about it, did we even need to implement our own 
UpsertToRetraction rules and RelNodes? Couldn't we re-use `Aggregate` `RelNode` 
with `LAST_NULLABLE_VALUE()` aggregation functions? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-05 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253795843
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
 
 Review comment:
   Ok, I talked also a little bit more with @twalthr and now I understand why 
do we need this `switch/case`.
   
   Regarding the code duplication, I'm not sure but maybe it could be 
deduplicated by modifying already existing `convertToInternalRow` and adding 
two more parameters:
   1. `Row getRow(T inputRecord)`
   2. `boolean getChange(T inputRecord)`
   
   and using those two parameters:
   ```
   new RichMapFunction[T, CRow] {
   ...
   
   override def map(inputRecord: T): CRow = {
 outCRow.row = getRow(inputRecord)
 outCRow.change = getChange(inputRecord)
 outCRow
   }
   }
   ```
   
   and this modified `convertToInternalRow` method could be called three times:
   1. for the current use case of append only data stream with `getRow(record) 
: record`, `getChange(record) : true`
   2. for upsert scala: with `getRow(record) : record._2`, `getChange(record) : 
record._1`
   3. for upsert java: with `getRow(record) : record.f1`, `getChange(record) : 
record.f0`
   
   or something along those lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-05 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253546583
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
 
 Review comment:
   Couple of remarks regarding this method:
   1. Why does it have this `switch/case` for scala/java tuples while the 
`convertToInternalRow` doesn't?
   2. I'm not sure, but It looks like it's duplicating logic from 
`convertToInternalRow`, maybe they should be deduplicated into one method and 
passing the `upsert/append` mode as a parameter for a couple of `if` statements?
   3. Please split it into smaller methods and you could do the same for 
`convertToInternalRow`. More or less where ever there is a comment like `// 
Scala tuple` or `// input is already of correct type. Only need to wrap it as 
CRow` extract the matching piece of code to a named method like 
`convertScalaTuple(...)` or `wrapAsCRow(...)`
   4. are all of the branches covered by the tests? Like the both branches of 
`if` under the java tuple?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-05 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253778196
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment(
 converterFunction match {
 
   case Some(func) =>
-new CRowMapRunner[OUT](func.name, func.code, func.returnType)
+new CRowToExternalTypeMapRunner[OUT](func.name, func.code, 
func.returnType)
 
 Review comment:
   You can keep this rename and class moving as a separate `[hotfix]` commit 
either at the beginning or at the end of this PR or as separate PR :) We do not 
have to drop those changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-05 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253776804
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   They have responded to me and basically there is no contract... as long as 
the tests are passing we are fine. I don't like it, but if you want we can 
leave those methods not implemented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253463228
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment(
 converterFunction match {
 
   case Some(func) =>
-new CRowMapRunner[OUT](func.name, func.code, func.returnType)
+new CRowToExternalTypeMapRunner[OUT](func.name, func.code, 
func.returnType)
 
 Review comment:
   As a general rule please do not mix simple refactorings like class renames 
and moving classes around with functional changes. Embedding those two things 
inside one commit makes sense only if the refactoring/rename is tightly coupled 
with the functional change and here it doesn't seem so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253559564
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   I'm asking on Calcite dev mailing list, trying to understand those methods. 
Until we get a clear response from them we can leave this as it is now. 
`Aggregate` node (which is almost identical from the semantics perspective as 
this one) also doesn't implement those methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253559564
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   I'm syncing with Calcite dev mailing list, trying to understand those 
methods. Until we get a clear response from them lets leave this as it is now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253554512
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 ##
 @@ -19,18 +19,49 @@
 package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
+import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMdUtil
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 trait CommonCalc {
 
+  /**
+* Returns empty if output field is not forwarded from the input for the 
calc.
+*/
+  private[flink] def getInputFromOutputName(calc: Calc, outputFieldName: 
String): Option[String] = {
 
 Review comment:
   Could you add a simple unit test for this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253551875
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Function used to convert upsert to retractions.
+  *
+  * @param rowTypeInfo the output row type info.
+  * @param queryConfig the configuration for the query.
+  */
+class UpsertToRetractionProcessFunction(
 
 Review comment:
   Why not using `GroupAggProcessFunction` with `LAST_VALUE()` aggregate 
function? It would allow us to share the logic and adding `LAST_VALUE()` is 
valuable addition to Flink SQL on it's own.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253546583
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
 
 Review comment:
   Couple of remarks regarding this method:
   1. Why does it have this `switch/case` for scala/java tuples while the 
`convertToInternalRow` doesn't?
   2. I'm not sure, but It looks like it's duplicating logic from 
`convertToInternalRow`, maybe they should be deduplicated into one method and 
passing the `upsert/append` mode as a parameter for a couple of `if` statements?
   3. Please split it into smaller methods and you could do the same for 
`convertToInternalRow`. More or less were ever there is a comment like `// 
Scala tuple` or `// input is already of correct type. Only need to wrap it as 
CRow` extract the matching piece of code to a named method like 
`convertScalaTuple(...)` or `wrapAsCRow(...)`
   4. are all of the branches covered by the tests? Like the both branches of 
`if` under the java tuple?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253465887
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
+  schema: RowSchema,
+  input: DataStream[Any],
+  fieldIdxs: Array[Int],
+  config: TableConfig,
+  rowtimeExpression: Option[RexNode]): DataStream[CRow] = {
+
+val internalType = schema.typeInfo
+val cRowType = CRowTypeInfo(internalType)
+
+val hasTimeIndicator = fieldIdxs.exists(f =>
+  f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
+f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
+
+val dsType = input.getType
+
+dsType match {
+// Scala tuple
+  case t: CaseClassTypeInfo[_]
+if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+val inputType = t.getTypeAt[Any](1)
+if (inputType == internalType && !hasTimeIndicator) {
+  // input is already of correct type. Only need to wrap it as CRow
+  input.asInstanceOf[DataStream[(Boolean, Row)]]
+.map(new RichMapFunction[(Boolean, Row), CRow] {
+  @transient private var outCRow: CRow = _
+  override def open(parameters: Configuration): Unit = {
+outCRow = new CRow(null, change = true)
+  }
+
+  override def map(v: (Boolean, Row)): CRow = {
+outCRow.row = v._2
+outCRow.change = v._1
+outCRow
+  }
+}).returns(cRowType)
+
+} else {
+  // input needs to be converted and wrapped as CRow or time 
indicators need to be generated
+
+  val function = generateConversionProcessFunction(
+config,
+inputType.asInstanceOf[TypeInformation[Any]],
+internalType,
+"UpsertStreamSourceConversion",
+schema.fieldNames,
+fieldIdxs,
+rowtimeExpression
+  )
+
+  val processFunc = new ScalaTupleToCRowProcessRunner(
+function.name,
+function.code,
+cRowType)
+
+  val opName = s"from: (${schema.fieldNames.mkString(", ")})"
+
+  input
+.asInstanceOf[DataStream[(Boolean, Any)]]
+.process(processFunc).name(opName).returns(cRowType)
+}
+
+  // Java tuple
+  case t: TupleTypeInfo[_]
+if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+val inputType = t.getTypeAt[Any](1)
+if (inputType == internalType && !hasTimeIndicator) {
+  // input is already of correct type. Only need to wrap it as CRow
+  input.asInstanceOf[DataStream[JTuple2[JBool, Row]]]
+.map(new RichMapFunction[JTuple2[JBool, Row], CRow] {
+  @transient private var outCRow: CRow = _
+  override def open(parameters: Configuration): Unit = {
+outCRow = new CRow(null, change = true)
+  }
+
+  override def map(v: JTuple2[JBool, Row]): CRow = {
+outCRow.row = v.f1
+outCRow.change = v.f0
+outCRow
+  }
+}).returns(cRowType)
+
+} else {
+  // input needs to be converted and wrapped as CRow or time 
indicators 

[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253468143
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
+  schema: RowSchema,
+  input: DataStream[Any],
+  fieldIdxs: Array[Int],
+  config: TableConfig,
+  rowtimeExpression: Option[RexNode]): DataStream[CRow] = {
+
+val internalType = schema.typeInfo
+val cRowType = CRowTypeInfo(internalType)
+
+val hasTimeIndicator = fieldIdxs.exists(f =>
 
 Review comment:
   rename `f` -> `fieldIndex` `fieldIdxs` -> `fieldIndexes`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253463228
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment(
 converterFunction match {
 
   case Some(func) =>
-new CRowMapRunner[OUT](func.name, func.code, func.returnType)
+new CRowToExternalTypeMapRunner[OUT](func.name, func.code, 
func.returnType)
 
 Review comment:
   Please, as a general rule do not mix simple refactorings like class renames 
and moving classes around with functional changes. Embedding those two things 
inside one commit makes sense only if the refactoring/rename is tightly coupled 
with the functional change and here it doesn't seem so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253557485
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+call.transformTo(getNewUpsertToRetraction(calc, upsertToRetraction))
+  }
+
+  private def getNewUpsertToRetraction(
+calc: FlinkLogicalCalc,
+upsertToRetraction: FlinkLogicalUpsertToRetraction): 
FlinkLogicalUpsertToRetraction = {
+
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+calc.getRowType.getFieldNames
+  .flatMap(calc.getInputFromOutputName(calc, _))
 
 Review comment:
   [newbie scala question]
   How does it work? `flatMap` here is unwrapping `Option` returned from 
`getInputFromOutputName`? If `Option` is empty, the result list will have null, 
will it be empty or will it throw an exception? (two first options are 
acceptable ;) )
   [/newbie scala question]


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-30 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r252326504
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   `FlinkLogicalUpsertToRetraction` has hidden `RexNode` reference - in form of 
`keyNames`.
   
   I think currently calcite is only using `accept()` or `getChildExps()` for 
decorrelating the plan. So currently there is no problem, because 
`FlinkLogicalUpsertToRetraction.keyNames` should never have anything to do with 
corellated subqueries. But my point is that if someone would use those methods 
for something else, like renaming variables, looking which fields are 
referenced, etc..., it would brake very silently and the failure would be very 
hard to debug.
   
   Look at the all of the implementations of 
`org.apache.calcite.rel.RelNode#accept(org.apache.calcite.rex.RexShuttle)`. 
Basically every node that has some some input references (or other 
expressions), is implementing this method.
   
   Unfortunately you might be right, that throwing an exception will not work 
here :/


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-29 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r251924624
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   I think that this comment of mine might be partially invalid. The only 
optimisation rule that we support around this logical node is 
`CalcUpsertToRetractionTransposeRule` which doesn't use this accept. But I 
guess for the sake of the consistency, it should be implemented one way or 
another, for example by simply `throw new UnsupportedOperationException()`. 
Otherwise this is a "silent" land mine waiting for someone to step in?
   
   Or am I missing something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-29 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r251922093
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), 
keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate
+// UpsertToRetractionConverter after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   As far as I can see `FlinkLogicalNativeTableScan` is the same to what I 
proposed in the comment above, right? The only difference is 
`estimateRowSize(child.getRowType)` vs `estimateRowSize(getRowType)`, which 
probably both are equivalent in this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249795484
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), 
keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate
+// UpsertToRetractionConverter after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least 
duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using 
`getFieldCount`?
   
   Also Isn't the io cost missing? To be consistent with join cost this should 
return:
   ```
   planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(child.getRowType))
   ```
   
   edit: probably a better idea is to use `estimateRowSize` and maybe in the 
future update it to use `RelMetadataQuery#getAverageRowSize()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249797875
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   Isn't this class missing a proper implementation of 
`AbstractRelNode#accept(org.apache.calcite.rex.RexShuttle)` method? It is used 
by calcite for example for gathering the "used" columns by this node. If 
`FlinkLogicalUpsertToRetraction#accept` do not visit primary key, I would be 
afraid that primary key might be pruned from the input of 
`FlinkLogicalUpsertToRetraction`.
   
   Also it is used for renaming fields etc. After optimisation your `keyNames: 
Seq[String]` can be not valid any more.
   
   As far as I (vaguely) recall from when I was implementing Temporal Table 
Joins, storing fields in form of `String` references was not very useful and 
using `RexNode` was a better option. 
   
   Probably this needs more investigation and some additional unit test(s).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249795484
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), 
keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate
+// UpsertToRetractionConverter after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least 
duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using 
`getFieldCount`?
   
   Also Isn't the io cost missing? To be consistent with join cost this should 
return:
   ```
   planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(child.getRowType))
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249790833
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   Yes, that was my point. If you drop the first condition it will also 
simplify this test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249788943
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   Also ditto :) I think here you shouldn't care if the field is duplicated or 
not, so if you decide not to use `mapInputToOutputName` method, implementing 
`CommonCalc#getInputToOutputNamesMapping()` that return a map or multimap is 
probably a better option then `Seq[String]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249790503
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
+calc.getProgram.expandLocalRef(p.left) match {
+  // output field is forwarded input field
+  case r: RexInputRef => (r.getIndex, p.right)
+  // output field is renamed input field
+  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+a.getOperands.get(0) match {
+  case ref: RexInputRef =>
+(ref.getIndex, p.right)
+  case _ =>
+(-1, p.right)
 
 Review comment:
   What do you mean? 
   
   Maybe to rephrase my comment: I think that instead of having this `match 
case` construct a better idea might to implement it's logic as 
`RexVisitorImpl`, probably as a private static nested class inside 
`FlinkLogicalCalc` or `CommonCalc`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249786791
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   Good point with `CommonCalc`. Probably it's a better place (btw, doesn't 
Calcite have some util method for that that works on `Calc`?).
   
   Regarding `getInputFromOutputName`. I don't mind which method you would add, 
both `getInputFromOutputName` and `mapInputToOutputNames` are valuable 
addition. However will the `getInputFromOutputName` help you? Don't you need 
the mapping in the opposite direction? With `calc` following the 
`UpsertToRetraction` node, you want to convert key names after transposing 
those two nodes. So you need to perform an operation like this:
   ```
   val keyNameAfterTransposition = 
calc.mapInputToOutputName(upsertToRetraction.key)
   ```
   or more precisely:
   ```
   val newKeys = upsertToRetraction.keyNames.map(calc.mapInputToOutputName) // 
plus code to checkstate that `Option` is present
   ```
   Right? 
   
   Secondly:
   > A field may contain multi-output. For example, `select key as key1, key as 
key2 from T`
   
   1. Does it matter? I guess one possibility would be to return the first 
encountered of those fields
   2. `Seq[String] mapInputToOutputNames(String input)` is also an option. It 
might be more general. Even you could implement: 
   ```
   Option[String] mapInputToOutputName(String input) {
 return mapInputToOutputNames.collectFirst(input);
   }
   ``` 


This is an automated 

[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249726595
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
 
 Review comment:
   Either rename the method or drop the `// key fields should not be changed` 
comment. Usually something is wrong if you need to write a comment explaining 
method name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249729097
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249729279
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   `getInputToOutputNamesMapping` and drop the comment? This method seems more 
generic and that it could be used in other places other then this rule 
`CalcUpsertToRetractionTransposeRule ` here. What do you think about placing it 
inside `FlinkLogicalCalc`? Maybe even refactoring it into something like this:
   ```
   class FlinkLogicalCalc:
 /**
  * Returns empty if field is not forwarded.
  */
 Option[String] mapInputToOutputName(String input)
   ```
   
   `Seq[Option[String]] mapInputToOutputNames(Seq[String] inputs)` would be 
probably more efficient, but maybe less readable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249735236
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   Some kind of `map` usually better expresses mappings between two things 
compared to list of tuples. Is there a reason that I do not see why have you 
chosen list of tuples? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249728531
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
 
 Review comment:
   drop this comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249735123
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
+calc.getProgram.expandLocalRef(p.left) match {
+  // output field is forwarded input field
+  case r: RexInputRef => (r.getIndex, p.right)
+  // output field is renamed input field
+  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+a.getOperands.get(0) match {
+  case ref: RexInputRef =>
+(ref.getIndex, p.right)
+  case _ =>
+(-1, p.right)
 
 Review comment:
   Why do you create a tuples `(int, string)` and later filtering out based on 
integers instead of for example inserting or not something to an array or a 
map? Another remark, I think that maybe implementing `RexVisitorImpl` (like 
`InputRefVisitor`) would be a better, more re-usable approach which would for 
example solve the problem of duplicating the code for `case r: RexInputRef`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249729068
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
 
 Review comment:
   drop comment it duplicates method name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249731317
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
 
 Review comment:
   Please do not use single letter/abbreviated variables  `io`,`p`, `r`, `a`, 
`ref`, it makes code cryptic. Also naming what `p.right` and `p.left` means by 
assigning those values to a named local variable would be useful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249728818
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
 
 Review comment:
   Extract to method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249726595
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
 
 Review comment:
   Either rename the method or drop the `// key fields should not be changed` 
comment. Something is wrong if you need to write a comment explaining method 
name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249727908
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
 
 Review comment:
   However I see one more issue here. Does this condition make sense? Shouldn't 
this be resolved/decided by cost base optimiser? Why do we need this heuristic? 
What if calc adds column(s) while also being a very selective filter? IMO this 
condition should be completely dropped and only conditions that guards the 
correctness should be tested here. Whether `calc` should or shouldn't be pushed 
down through `UpsertToRetraction` should be decided by the cost of the plan. 
Cost of `UpsertToRetraction` should reflect both the number of rows and size of 
the rows. Is this happening? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249725951
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   Generally speaking naming of the those tests is not perfect (I'm really 
struggling to understand what are they doing), but I first wanted to understand 
what are you testing for.
   
   But `testCalcTransposeUpsertToRetraction` is a subset of 
`testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose`. They both 
test exact same code paths/branches, aren't they? On the other hand you are not 
testing for the condition:
   ```
   calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount
   ```
   The last test checks only for the:
   ```
   // key fields should not be changed
 fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249728417
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
 
 Review comment:
   Same here. Instead of writing a comment explaining 
`calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount` 
encapsulate this logic inside a simple method which name would explain it.
   
   And ditto in other places. There are comments that do not explain anything 
or comments that should be replaced by extracted method names.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-18 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249056854
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
+val input = upsertToRetraction.getInput.accept(this)
+val relTypes = input.getRowType.getFieldList.map(_.getType)
+val timeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2).toSet
+
+val rewrittenInput = input.copy(input.getTraitSet, input.getInputs)
 
 Review comment:
   `input` is already a `rewrittenInput`. `val rewrittenInput = 
input.copy(input.getTraitSet, input.getInputs)` looks to me as a no-op:
   1. rename `input` -> `rewrittenInput`
   2. drop copy?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-18 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249065571
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ##
 @@ -183,6 +183,17 @@ object UpdatingPlanChecker {
   lJoinKeys.zip(rJoinKeys)
 )
   }
+
+case l: DataStreamUpsertToRetraction =>
+  val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex
+.filter(e => l.keyIndexes.contains(e._2))
+.map(_._1)
+  Some(uniqueKeyNames.map(e => (e, e)))
+
+case scan: UpsertStreamScan =>
 
 Review comment:
   Let's leave it as it is now, but we could implement our own 
`DataStreamRelShuttle` (equivalent of `RelShuttle`) that would work on all of 
our `DataStream*` nodes (and adding appropriate `visit(DataStreamRelShuttle)` 
methods to all of our `DataStream*` nodes.
   
   I once wanted to write something like this already some time ago to solve 
some other problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-18 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r24905
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/UpsertStreamScan.scala
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.expressions.Cast
+import org.apache.flink.table.plan.schema.{RowSchema, UpsertStreamTable}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+/**
+  * Flink RelNode which matches along with DataStreamSource. Different from 
[[AppendStreamScan]],
+  * [[UpsertStreamScan]] is used to handle upsert streams from source.
+  */
+class UpsertStreamScan(
 
 Review comment:
   Probably -  as you prefer. (I think it looks good now)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-18 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249072493
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   But why is it possible to push down the calc in one and not in the other? 
Also it's not very for me how/why those two tests (and results) differ from 
`testCalcTransposeUpsertToRetraction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-18 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249056249
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
 
 Review comment:
   Generally speaking I would prefer it to happen in this PR (it could be 
implemented as a first commit of this PR), since otherwise we are agreeing on 
merging an instant technological debt.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-18 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249057349
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
+val input = upsertToRetraction.getInput.accept(this)
+val relTypes = input.getRowType.getFieldList.map(_.getType)
+val timeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2).toSet
+
+val rewrittenInput = input.copy(input.getTraitSet, input.getInputs)
+val newInput =
+  materializerUtils.projectAndMaterializeFields(rewrittenInput, 
timeIndicatorIndexes)
 
 Review comment:
   you could extract the lines 222 and 223 to:
   
   ```
   val materializedInput =
 materializerUtils.projectAndMaterializeAllFields(rewrittenInput)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-16 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247926320
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
 
 Review comment:
   I think this code is a bit incorrect. 
   
   First, you didn't recursively call the `RelTimeIndicatorConverter` on the 
`upsertToRetraction` input:
   ```
   val rewrittenInput = upsertToRetraction.accept(this)
   ```
   
   Secondly, we should solve this in some more generic way. Not only 
`upsertToRetraction` nodes needs to materialize it's output, but basically 
every node that produces updates/retractions needs to do that.
   
   I would propose to introduce a common logic to do that, maybe as a separate 
`RelShuttle` that we would execute at the end of the planning (probably by 
checking `DataStreamRel#producesUpdates` flag). What do you think?
   
   Also this requires some tests coverage (that `rowtTime` was materialised).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247962668
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala
 ##
 @@ -95,4 +95,41 @@ object StreamTestData {
 data.+=(((3, 3), "three"))
 env.fromCollection(data)
   }
+
+  def getSmall3TupleUpsertStream(env: StreamExecutionEnvironment):
+  DataStream[(Boolean, (Int, Long, String))] = {
+val data = new mutable.MutableList[(Boolean, (Int, Long, String))]
+data.+=((true, (1, 1L, "Hi")))
+data.+=((true, (2, 2L, "Hello")))
+data.+=((true, (3, 2L, "Hello world")))
+env.fromCollection(data)
+  }
+
+  def get3TupleUpsertStream(env: StreamExecutionEnvironment):
 
 Review comment:
   Is this method used somewhere?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247961118
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT b as b1, c, proctime as proctime1, rowtime as rowtime1 
FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamUpsertToRetraction",
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  ),
+  term("keys", "b"),
+  term("select", "a", "b", "c", "proctime", "rowtime")
+),
+term("select", "b AS b1", "c", "proctime AS proctime1", "rowtime AS 
rowtime1"))
+streamUtil.verifySql(sql, expected, true)
+  }
+
+  @Test
+  def testCalcCannotTransposeUpsertToRetraction() = {
 
 Review comment:
   What does this test case test? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247952668
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/UpsertStreamScan.scala
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.expressions.Cast
+import org.apache.flink.table.plan.schema.{RowSchema, UpsertStreamTable}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+/**
+  * Flink RelNode which matches along with DataStreamSource. Different from 
[[AppendStreamScan]],
+  * [[UpsertStreamScan]] is used to handle upsert streams from source.
+  */
+class UpsertStreamScan(
 
 Review comment:
   This class duplicates code with `AppendStreamScan`. Either we should 
introduce a common base class or maybe `UpsertStreamScan` can extend from 
`AppendStreamScan`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247954067
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ##
 @@ -183,6 +183,17 @@ object UpdatingPlanChecker {
   lJoinKeys.zip(rJoinKeys)
 )
   }
+
+case l: DataStreamUpsertToRetraction =>
 
 Review comment:
   please no one letter variable names (`l`, `e` etc)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247934064
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamUpsertToRetraction(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamUpsertToRetraction(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"UpsertToRetractionConverter(${
+  if (keyNames.nonEmpty) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
+
+val needRetraction = DataStreamRetractionRules.isAccRetract(this)
+
+if (!needRetraction) {
 
 Review comment:
   Just `checkState(DataStreamRetractionRules.isAccRetract(this))`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247960761
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   Why does the results of 
`testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose` and 
`testCalcCannotTransposeUpsertToRetraction` differ? I would expect all of them 
to have the same basic plan structure/same plan tree % returned columns.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247962174
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
 ##
 @@ -46,6 +46,20 @@ class TableSinkValidationTest extends TableTestBase {
 env.execute()
   }
 
+  @Test(expected = classOf[TableException])
+  def testAppendSinkOnUpdatingTable2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val t = tEnv.fromUpsertStream(
+  StreamTestData.getSmall3TupleUpsertStream(env), 'id.key, 'num, 'text)
+
+t.writeToSink(new TestAppendSink)
+
+// must fail because table is not append-only
+env.execute()
 
 Review comment:
   Isn't `tEnv.explain(t)` enough? Do we need full `ITCase` for that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247926320
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
 
 Review comment:
   I think this code is incorrect. 
   
   First, you didn't recursively call the `RelTimeIndicatorConverter` on the 
`upsertToRetraction` input:
   ```
   val rewrittenInput = upsertToRetraction.accept(this)
   ```
   
   Secondly, `LogicalUpsertToRetraction` doesn't have to materialize any 
fields: it doesn't invalidate watermarks guarantees (all records after the 
watermark have a rowtime value above the watermark). In other words, it 
preserves rowtime fields/watermark guarantees from it's input. A rowtime field 
must be materialized if this contract between rowtime & watermark is violated, 
like for example in the non windowed joins (rowtime fields in the join result 
can be older than the watermark).
   
   For `LogicalUpsertToRetraction` that's not the case, isn't it? If I'm 
correct (and please correct me if I'm wrong), the code should look just like 
this:
   ```
   def visit(...):
 val rewrittenInput = upsertToRetraction.accept(this)
 return upsertToRetraction.copy(upsertToRetraction.getTraitSet, 
Seq(rewrittenInput))
   ```
   
   I think the second issues would be caught by a test that uses time windowed 
join/aggregation on the upsert source (this should work, but in the current 
version of this PR I would expect it to fail). 
   ```
   SELECT 
 key, max(value) 
   FROM
 UpsertTable
   GROUP BY 
 TUMBLE(rowTime1, INTERVAL '1' DAY), key
   ```
   
   First issue is probably difficult to test/trigger since at the moment when 
`RelTimeIndicatorConverter` is executed, `LogicalUpsertToRetraction` is always 
just after the `TableScan` node. However if we changed the order of 
`CalcUpsertToRetractionTransposeRule` and `RelTimeIndicatorConverter` current 
code would fail to materialize fields for queries like:
   
   ```
   SELECT 
 key, max(value) 
   FROM (
 SELECT rowTime1 + 1 as rowTime2, key, value FROM UpsertTable)
   GROUP BY 
 TUMBLE(rowTime2, INTERVAL '1' DAY), key
   ```
   
   Even if I'm wrong, could you add those two unit tests?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247935469
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamUpsertToRetraction(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamUpsertToRetraction(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"UpsertToRetractionConverter(${
+  if (keyNames.nonEmpty) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
 
 Review comment:
   nitty nit: you do not need those variables in this commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247933734
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamUpsertToRetraction(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamUpsertToRetraction(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"UpsertToRetractionConverter(${
+  if (keyNames.nonEmpty) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
+
+val needRetraction = DataStreamRetractionRules.isAccRetract(this)
+
+if (!needRetraction) {
+  throw new TableException("DataStreamUpsertToRetraction should always 
generate retractions, " +
+"this should be a bug!")
+}
+
+// todo: return inputDS for plan test. The detailed code will be ready in 
the next commit
 
 Review comment:
   `throw new NotImplementedError()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247935469
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUpsertToRetraction.scala
 ##
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamUpsertToRetraction(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamUpsertToRetraction(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.nonEmpty)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"UpsertToRetractionConverter(${
+  if (keyNames.nonEmpty) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
 
 Review comment:
   nitty nit: you do not need those variables in this commit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247904188
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -808,8 +910,10 @@ abstract class StreamTableEnvironment(
 val convSubQueryPlan = optimizeConvertSubQueries(relNode)
 val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
 val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
+val upsertToRetractPlan =
 
 Review comment:
   `planWithConvertedUpsertToRetraction`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247954694
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ##
 @@ -183,6 +183,17 @@ object UpdatingPlanChecker {
   lJoinKeys.zip(rJoinKeys)
 )
   }
+
+case l: DataStreamUpsertToRetraction =>
+  val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex
+.filter(e => l.keyIndexes.contains(e._2))
+.map(_._1)
+  Some(uniqueKeyNames.map(e => (e, e)))
+
+case scan: UpsertStreamScan =>
 
 Review comment:
   This big `match` `case` looks veerryy strange... Why isn't it solved in some 
more object oriented way like via a visitor pattern?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247955886
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -170,6 +170,19 @@ class StreamTableEnvironmentTest extends TableTestBase {
 jTEnv.fromAppendStream(ds, "rt.rowtime, b, c, d, e, pt.proctime")
   }
 
+  @Test
+  def testAddTableFromUpsert(): Unit = {
 
 Review comment:
   what does this test check/asserts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247957706
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase {
 (jTEnv, ds)
   }
 
+  private def prepareKeyedSchemaExpressionParser:
+(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, 
JInt, JLong]]]) = {
+
+val jStreamExecEnv = mock(classOf[JStreamExecEnv])
 
 Review comment:
   Do not use mockito here, please implement a proper mock or use actual 
implementation. There are already at least couple of dummy mocks for this class 
in our code base - you could deduplicate them and put one 
`DummyStreamExecutionEnvironment` test jar of `flink-streaming-java` or 
something along those lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247958035
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 ##
 @@ -185,4 +198,20 @@ class StreamTableEnvironmentTest extends TableTestBase {
 (jTEnv, ds)
   }
 
+  private def prepareKeyedSchemaExpressionParser:
+(JStreamTableEnv, DataStream[JTuple2[JBool, JTuple5[JLong, JInt, String, 
JInt, JLong]]]) = {
+
+val jStreamExecEnv = mock(classOf[JStreamExecEnv])
+
when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
+val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv)
+
+val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, 
Types.INT, Types.LONG)
+  .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
+val dsType = new TupleTypeInfo(Types.BOOLEAN, sType)
+  .asInstanceOf[TupleTypeInfo[JTuple2[JBool, JTuple5[JLong, JInt, String, 
JInt, JLong
+val ds = mock(classOf[DataStream[JTuple2[JBool, JTuple5[JLong, JInt, 
String, JInt, JLong)
+when(ds.getType).thenReturn(dsType)
 
 Review comment:
   ditto about the mockito and one abbreviated variables (`sType` vs `dsType`?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-15 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247932397
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -357,7 +358,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 // function call must always be at the end
 suffixFunctionCall | suffixFunctionCallOneArg |
 // rowtime or proctime
-timeIndicator
+timeIndicator | key
 
 Review comment:
   `key` in a new line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-09 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r246322448
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalToEnumerableTableScan.scala
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts a LogicalTableScan into an EnumerableTableScan. We need 
this rule to
 
 Review comment:
   If calling a rule only once is impossible, I would be in favour of 
implementing a separate step similar to `RelTimeIndicatorConverter`. Current 
approach is a bit hacky:
   1. converting back and forth `EnumerableTableScan` & `LogicalTableScan` is 
not clean on it's own
   2. it interconnects with already existing `EnumerableToLogicalTableScan` 
which is a kind of hack on it's own. 
   3. `EnumerableToLogicalTableScan` previously had a different purpose and 
it's better to have two smaller independent components (old 
`EnumerableToLogicalTableScan` & new `UpsertToRetractionConverter` (?)) that 
have nothing to do with another, then squeezing them together.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241816667
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
 ##
 @@ -38,7 +41,25 @@ class EnumerableToLogicalTableScan(
 val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
 val table = oldRel.getTable
 val newRel = LogicalTableScan.create(oldRel.getCluster, table)
-call.transformTo(newRel)
+
+val streamTable = table.unwrap(classOf[UpsertStreamTable[_]])
+val isUpsertTable = streamTable match {
+  case _: UpsertStreamTable[_] =>
+true
+  case _ =>
+false
+}
+
+if (isUpsertTable) {
 
 Review comment:
   you could merge this if check with match above to simplify code a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241811123
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   ```
   Methods in DataStreamConversions:
   
   toTableFromAppendStream
   toTableFromUpsertStream
   toTableFromRetractStream
   ```
   
   I think this makes sense. @twalthr do you think as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241820886
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
 ##
 @@ -65,21 +64,6 @@ class ExplainTest extends AbstractTestBase {
 assertEquals(expect, result)
   }
 
-  @Test
 
 Review comment:
   Why have you removed this test? Was it moved/superseded by something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241817741
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
 ##
 @@ -22,13 +22,16 @@ import 
org.apache.calcite.adapter.enumerable.EnumerableTableScan
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
 import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.schema.UpsertStreamTable
 
 /**
  * Rule that converts an EnumerableTableScan into a LogicalTableScan.
  * We need this rule because Calcite creates an EnumerableTableScan
  * when parsing a SQL query. We convert it into a LogicalTableScan
  * so we can merge the optimization process with any plan that might be created
- * by the Table API.
+ * by the Table API. The rule also checks whether the source is an upsert 
source and adds
 
 Review comment:
   This rule is only applied on the `EnumerableTableScan` nodes. This comment 
suggests, that `EnumerableTableScan` appears only in sql, so something is wrong 
here. If we want to keep it as it is now, the comment need updating.
   
   (please check also my comment below in `LogicalToEnumerableTableScan`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241820550
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalToEnumerableTableScan.scala
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts a LogicalTableScan into an EnumerableTableScan. We need 
this rule to
 
 Review comment:
   What's the problem? That we can not force calcite to fire a rule only once? 
Like I would expect to have a single pass rule, that's fired only once (around 
expand plan optimisation phase), that goes through the plan, checks if the 
sources is upsert and if so, inserts `LogicalUpsertToRetraction`. 
   
   Alternatively, we can think about doing this one step earlier. Like in Table 
API somebody is creating a `LogicalTableScan`, right? We could inject logic 
that checks if we need `LogicalUpsertToRetraction` or not there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241807984
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -573,6 +573,17 @@ abstract class StreamTableEnvironment(
 registerTableInternal(name, dataStreamTable)
   }
 
+  def getTypeFromUpsertStream[T](dataStream: DataStream[T]): 
TypeInformation[T] = {
 
 Review comment:
   private?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241809269
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -224,14 +226,14 @@ case class ProctimeAttribute(expr: Expression) extends 
TimeAttribute(expr) {
   override def toString: String = s"proctime($child)"
 }
 
-case class Key(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = child.resultType
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-throw new UnsupportedOperationException(
-  s"Key Expression can only be used during table initialization.")
-  }
-}
+//case class Key(child: Expression) extends UnaryExpression {
 
 Review comment:
   Some left overs to remove?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241815618
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/RemoveDataStreamUpsertToRetractionRule.scala
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.flink.table.plan.nodes.datastream.{AccMode, 
DataStreamUpsertToRetraction}
+
+/**
+  * Rule to remove [[DataStreamUpsertToRetraction]] under [[AccMode]]. In this 
case, it is a no-op
 
 Review comment:
   `under [[AccMode.AccRetract]].`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-28 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237008957
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   (I've to this responded above)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-28 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237004073
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   Generally speaking `toTable`, `toTableFromUpsertStream`, 
`toTableFromRetractStream` sounds good to me, but what's the difference between 
`toTableFromRetractStream` vs `toTable`? Also secondly, would 
`toTableFromRetractStream` call `tableEnv.fromAppendStream()`? If so, there 
would be one another naming inconsistency. 
   
   Maybe renaming `fromDataStream ` to `fromAppendStream` was a bit incorrect? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-28 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237018239
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
 ##
 @@ -36,25 +36,32 @@ class DataStreamScanRule
 
   override def matches(call: RelOptRuleCall): Boolean = {
 val scan: FlinkLogicalNativeTableScan = 
call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan]
-val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
-dataSetTable match {
-  case _: DataStreamTable[Any] =>
-true
-  case _ =>
-false
-}
+val appendTable = scan.getTable.unwrap(classOf[AppendStreamTable[Any]])
+val upsertTable = scan.getTable.unwrap(classOf[UpsertStreamTable[Any]])
+
+appendTable != null || upsertTable != null
 
 Review comment:
   can we avoid those nulls (in multiple places)? For example by using 
`isInstanceOf` or match? Handling null is dangerous and error prone.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-28 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237017454
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalLastRow
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalLastRow(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate LastRow
+// after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   as mentioned above (in the no-op/renaming discussion), this cost computation 
is not/will not be correct in case of no-op


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-28 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237008338
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+if (upsertStreamTable != null) {
+  val relTypes = scan.getRowType.getFieldList.map(_.getType)
+  val timeIndicatorIndexes = relTypes.zipWithIndex
+.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+.map(_._2)
+  val input = if (timeIndicatorIndexes.nonEmpty) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
timeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   I have revisited our previous discussion in the design/google doc and I 
still have the same concern as I had then:
   > In that case this is strange. It's like having 
filter/projection/aggregation nodes that after pruning/removing/pushing them 
down are sometimes NO-OPs and sometimes not. Besides being strange it can have 
some negative side effects:
   > - presence of a no-op node (when it doesn't need to be there) can 
block/mess with other optimisations or make them more complex
   > - it will pollute printed explain plan to the user. This is important, 
since this will be very costly node and it will be very hard to explain users 
when this node requires huge state and when not
   > - it would make cost computations more complicated
   
   Back then it sparked a discussion, where you were saying that it will never 
be a no-op and you wanted to `LastRow` always have a state and purpose besides 
upsert to retraction conversion (filtering out empty deletes). However that was 
resolved and filtering out empty deletes was dropped. Thus it brings me back to 
the issue of having this as a no-op node in the plan.
   
   This connects with your other response regarding naming `LastRow`
   > The LastRow node may be a no-op node in the case such as upsert source -> 
calc -> upsert sink. While LastRow will convert upsert stream to retract stream 
if a downstream node needs it to, such as upsert source -> calc -> retract 
sink. Whether convert to retract stream will be decided by RetractionRules.
   
   I would still argue that it should be named `UpsertToRetractionConverter` 
(or sth along those lines) and if not needed, it should be not in the plan. 
Maybe this means that our `RetractionRules` are not sufficient and needs some 
refactoring.
   
   I could see couple of solutions to that, but probably the best would be to 
deduce whether we need to insert `UpsertToRetractionConverter` or not inside 
the rule that is supposed to create it. Further optimisations/rewrites would 
have to correctly handle/preserve semantic/trait of upserts vs retractions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-27 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236606838
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   >  We can't name it UpsertToRetractionsConverter since we don't always 
convert upsert to retractions in this node. 
   
   can you elaborate on that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-27 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236606505
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
+
+  lazy val key: PackratParser[Expression] =
+(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY 
^^ {
 
 Review comment:
   Yes, I meant that this `(aliasMapping | "(" ~> aliasMapping <~ ")" | 
fieldReference)` is duplicated couple of times here. If it is possible to 
deduplicate it to:
   ```
   lazy val aliasOrFieldReference = aliasMapping | "(" ~> aliasMapping <~ ")" | 
fieldReference
   ```
   I think it would be better to do so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-27 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236601205
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
 
 Review comment:
   I meant that this check doesn't prevent from any bugs or from any other 
exceptions. It's just to block user from specifying a no-op key definition, 
right? That's why I'm not sure if we should block it.
   
   If we decide to keep this check/exception, I would refrain from naming 
`fromUpsertStream` in the exception. Maybe rephrasing it to:
   > Defining key on append stream do not have any effects
   
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-27 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236601884
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
 
 Review comment:
   Ok, I get it - upsert streams without a primary key are single row tables. 
But what do you mean by:
   > The primary key would be necessary.
   
   ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-27 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236604746
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   Maybe `upsertStreamToTable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-27 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236597321
 
 

 ##
 File path: 
flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
 ##
 @@ -54,9 +54,9 @@ object StreamSQLExample {
   Order(4L, "beer", 1)))
 
 // convert DataStream to Table
-var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
 
 Review comment:
   I would prefer to document this rename in this PR, but if you want it all do 
together with upsert sources documentation then OK, but let's not forget about 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-27 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236605473
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
 
 Review comment:
   The messages in the table will be encoded as `Tuple2`? Or the incoming 
messages from the source `DataStream` are expected to be encoded as `Tuple2`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236196242
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+if (upsertStreamTable != null) {
+  val relTypes = scan.getRowType.getFieldList.map(_.getType)
+  val timeIndicatorIndexes = relTypes.zipWithIndex
+.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+.map(_._2)
+  val input = if (timeIndicatorIndexes.nonEmpty) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
timeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   Do we ALWAYS convert upserts to retractions? Even for pipelines `upsert 
source -> filter -> upsert sink` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236197534
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
+
+  lazy val key: PackratParser[Expression] =
+(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY 
^^ {
 
 Review comment:
   can we deduplicate `(aliasMapping | "(" ~> aliasMapping <~ ")" | 
fieldReference)`? I would expect so but I'm not familiar with this scala magic 
`ExpressionParser`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236202082
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   I still think this is a bad and misleading name for this operator. `LastRow` 
doesn't explain neither the purpose, nor the actual thing that it's doing. If 
someone was asked without prior knowledge what `LastRow` RelNode/operator does, 
it would be impossible to guess the correct answer. It's even more confusing 
when you are binding `LastRow` with "upsert" name in the java doc:
   
   > Represent an upsert source.
   
   > The upsert key names.
   
   Please either change the name to something like 
`UpsertToRetractionsConverter` that explicitly states the purpose of this class 
(than you can keep `upsert` references in the java docs) or if you would like 
to keep naming by the thing that it does it would have to meet the following 
requirements:
   - `LastRow` is independent of the upsert -> retraction conversion logic
   - there is a reasonable chance that it will be used somewhere else outside 
of the upsert -> retraction conversion context 
   
   It's a bit like we would rename `LogicalJoin` to `HashTable`. Kind of true 
but misleading. It could make sense if:
   - `HashTable` is extracted to separate class
   - `LogicalJoin` relnode/operator/concept still exists, but is just using 
`HashTable`
   - `HashTable` is reused somewhere else


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236177333
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 referenceByName(name, p).map((_, name))
   case Alias(UnresolvedFieldReference(origName), name: String, _) =>
 referenceByName(origName, p).map((_, name))
+  case (Key(UnresolvedFieldReference(name: String))) =>
 
 Review comment:
   Tbh, at this moment I'm not sure if adding `Key` as nested case class was a 
good idea. It adds quite a lot of boiler plate and special handling in multiple 
places, which contradicts a clean code. 
   
   I wonder if `UnresolvedFieldReference` shouldn't have some kind of "trait" 
that's a primary key. Where by a "trait" I mean any way to determine if that's 
a primary key or not. For example either some boolean flag or maybe introducing 
an `UnresolvedKeyFieldReference` that extends from `UnresolvedFieldReference`? 
I think both of those solutions would allow avoid this code/handling 
duplication in most of the places and special handle  `Key` only where it is 
necessary. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236197832
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/AppendStreamScan.scala
 ##
 @@ -36,20 +36,20 @@ import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
   * It ensures that types without deterministic field order (e.g. POJOs) are 
not part of
   * the plan translation.
   */
-class DataStreamScan(
+class AppendStreamScan(
 
 Review comment:
   Shouldn't this belong to the previous commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236202531
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
 
 Review comment:
   if `keyNames` are "upsert key names" then just name it `upsertKeyNames` and 
drop the comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236168761
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
 
 Review comment:
   aren't we missing here the user defined primary key? What's the value of 
upsert stream without a defined primary key?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236174343
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1089,6 +1089,19 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 } else {
   referenceByName(origName, t).map((_, name))
 }
+  case (Key(UnresolvedFieldReference(name: String)), idx) =>
+if (isRefByPos) {
 
 Review comment:
   Please deduplicate this if and one if below, with non keyed versions. Either 
extract those if's to separate functions or convert whole match into a function 
sth like this:
   ```
   foo(expr, index) {
 match expr {
   case Key(keyExpr) => foo(keyExpr, index)
   case UnresolvedFieldReference(...) => ...
   case Alias(UnresolvedFieldReference(...)) => ...
 }
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236196723
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
 
 Review comment:
   nit: this comment is not helpful it just duplicates the field name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236195894
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
getTimeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   But you could probably add this node when changing convention. We assume 
that `TabelScan` rel node represents all kinds of table scans and it always can 
produce retractions, while for some stage (`FlinkLogical`?) we freeze it to 
either retractions/upserts.
   
   Regardless of that, `RelTimeIndicatorConverter.scala` is not the right place 
to create `LogicalLastRow`. You could also add a single pass rule, that is 
applied only once, that inserts upsert -> retraction conversion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236177806
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1112,13 +1129,17 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 exprs flatMap {
   case _: TimeAttribute =>
 None
-  case UnresolvedFieldReference(_) if referenced =>
+  case UnresolvedFieldReference(_) | Key(UnresolvedFieldReference(_)) 
if referenced =>
 // only accept the first field for an atomic type
 throw new TableException("Only the first field can reference an 
atomic type.")
   case UnresolvedFieldReference(name: String) =>
 referenced = true
 // first field reference is mapped to atomic type
 Some((0, name))
+  case Key(UnresolvedFieldReference(name: String)) =>
+referenced = true
 
 Review comment:
   ditto: another code duplication. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236169075
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
+
+val streamType: TypeInformation[T] = dataStream.getType match {
 
 Review comment:
   lines L586:L593 seems to be duplicated with L619:L626


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236198600
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
 
 Review comment:
   ? I think this comment is incorrect. This is not the source on it's own, but 
only a conversion class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236187717
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   This is some naming inconsistency: `toKeyedTable` vs `fromUpsertStream`. 
Rename `keyed` to `upsert`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-11-26 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236171634
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
 
 Review comment:
   I'm not sure if we need/want this check. For example for Temporal Joins, 
user might want to interpret upsert stream as an `AppendStreamTable`:
   
   
https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit#heading=h.yqq0cgo927fp
   
   also there might be other use cases where specifing primary key on an 
`AppendTable` makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >