[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4532


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r133037759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

What would happen if the left most table does not have a time attribute (or 
if it is projected out)? I just think that the semantics of the `StreamRecord` 
timestamps are too important to have an implicit behavior that is hard to 
explain and reason about for users. IMO, an exception that asks for explicit 
user input is the better choice compared to a behavior that depends on 
non-obvious query characteristics and is hard to predict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132971979
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

@fhueske With left most table I mean the first time indicator in the select 
statement (from  left). I think even join reordering does not change the column 
ordering. I agree that at least `TableSink`s should do deal with it implicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132889590
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

@wuchong let's rework the restrictions for a `TableSink` in follow up issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132889039
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

See https://travis-ci.org/apache/flink/jobs/263806347
It only happens when using Scala 2.10.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132863952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
 ---
@@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 
 /**
-  * Function that materializes a time attribute to the metadata timestamp. 
After materialization
-  * the result can be used in regular arithmetical calculations.
+  * Function that materializes a processing time attribute.
+  * After materialization the result can be used in regular arithmetical 
calculations.
   */
-object TimeMaterializationSqlFunction
+object ProctimeSqlFunction
--- End diff --

Should we move this object to `org.apache.flink.table.functions.sql` 
package? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132854814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
--- End diff --

in fact the data type is changed to keep the data type unchanged. A time 
indicator is externally represented and treated as a `TIMESTAMP` field and only 
internally handled as `LONG`. Therefore, we need to convert it into a 
`TIMESTAMP` once the result is converted into a `DataStream`.

You are right, that we need to convert all time indicators to `TIMESTAMP` 
and not only one. This is currently enforced by the exception that you 
observed. Currently users have to cast all but one time indicator attributes to 
`TIMESTAMP`. That will also convert them from `long` to `Timestamp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132854238
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I think we should avoid implicit defaults like using the timestamp 
attribute of the left most table (the left most table might not have a time 
indicator attribute, join order optimization would change the order of tables) 
and special cases for queries like `SELECT *`. 

When a `Table` is converted into a `DataStream` it is likely that the 
resulting stream is further processed by logic that cannot be expressed in SQL 
/ Table API. If a `Table` has multiple timestamp attributes, IMO a user should 
be forced to make a choice for the `StreamRecord` timestamp, because the 
semantics of any subsequent time-based operations will depend on that. I see 
two ways to do that:
- ensure that only one attribute is a time indicator by casting the others 
to `TIMESTAMP`
- let the user specify which field should be used as timestamp as an 
additional parameter of the `toAppendStream` and `toRetractStream` methods.

We could also do both.

I agree with @wuchong that we do not need this restriction when we emit a 
`Table` to a `TableSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132842811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

In my local environment, `toArray` also seems to be redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132840719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

It builds successfully when I remove the `toArray` in my local environment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132841043
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I'm fine with both current approach and use time indicator of left table as 
default. But I think no exception should be thrown when writing a Table to a 
TableSink. But currently, they share the same exception code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132834552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
+s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+  s"the table that should be converted to a DataStream.\n" +
+  s"Please select the rowtime field that should be used as 
event-time timestamp for the " +
+  s"DataStream by casting all other fields to TIMESTAMP.")
+} else if (rowtimeFields.size == 1) {
+  val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType
+  val convFieldTypes = origRowType.getFieldTypes.map { t =>
+if (FlinkTypeFactory.isRowtimeIndicatorType(t)) {
--- End diff --

.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132839934
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala
 ---
@@ -16,32 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime
+package org.apache.flink.table.runtime.conversion
 
 import java.lang.{Boolean => JBool}
 
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
-  * Convert [[CRow]] to a [[JTuple2]]
+  * Convert [[CRow]] to a [[JTuple2]].
   */
-class CRowInputJavaTupleOutputMapRunner(
+class CRowToJavaTupleMapRunner(
 name: String,
 code: String,
 @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
   extends RichMapFunction[CRow, Any]
   with ResultTypeQueryable[JTuple2[JBool, Any]]
   with Compiler[MapFunction[Row, Any]] {
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132833668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
--- End diff --

It seems that this function only changes the data type of the rowtime field 
from Long to Timestamp. Shall we consider making the `rowtimeIdx` an array? 
Besides, as @wuchong suggested, I also think a query should keep the data type 
unchanged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132822158
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

@xccui You just described exactly what this PR does :D
We allow multiple rowtimes and store them in the record. Each operator 
picks what is needed. The exception that you got and the problem that I 
described, is the final conversion from Table API back to DataStream API. The 
DataStream API only allows one timestamp.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132820853
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I got an idea, but not sure if it's applicable. We allow multiple rowtime 
fields in a stream but only activate one in an operator. Since the timestamps 
are stored in records, the other inactive rowtime fields can just be taken as 
common fields. Any changes on the rowtime fields will render them invalid for 
rowtime use. IMO, there are not too many queries (maybe only over aggregate and 
join) depending on the rowtime, thus the optimizer may be able to deduce which 
rowtime field should be activated in an operator. However, some existing logics 
may be affected by that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132819240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

Maybe we should treat the `SELECT *` as a special case with no following 
time indicator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132819174
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

This is a very good question. I don't know how we want to solve the `SELECT 
*` problem. My initial idea was to just the attribute of the left table as a 
time indicator. Another idea would be to remove all time indicators and 
explicitly choose one attribute `SELECT TIME(OrderA.rowtime)`. Still not 
perfect. We need to discuss this. @fhueske what do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132817252
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

Thanks for the PR, @fhueske and @twalthr . I tried to rebase my rowtime 
join codes on this branch, but encountered this exception. The test SQL is 
`SELECT * FROM OrderA, OrderB WHERE OrderA.productA = OrderB.productB AND 
OrderB.rtB BETWEEN OrderA.rtA AND OrderA.rtA + INTERVAL '2' SECOND`. What 
should I do to *cast all other fields to TIMESTAMP*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/4532

[FLINK-7337] [table] Refactor internal handling of time indicator 
attributes and efficiency

## What is the purpose of the change

*This PR is an improvement of @fhueske's PR #4488. For an description see 
#4488. This PR improves the efficiency by not creating objects for every 
timestamps but Long values and serializing them using the LongSerializer. It 
also contains code clean-up changes.*



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-7337

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4532.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4532


commit 03ca69505e14aea5452a1b6e77d942ecc2440de4
Author: Fabian Hueske 
Date:   2017-08-04T00:20:56Z

[FLINK-7337] [table] Refactor internal handling of time indicator 
attributes.

- Expand phyiscal Row schema for time indicators.
- Refactor computation of logical schema of tables to import.
- Refactor operators to use time attribute in Row instead of StreamRecord 
timestamp.

commit 876369d41019b3ec5ba824553b31fb2f3b44a18d
Author: Fabian Hueske 
Date:   2017-08-07T21:39:48Z

Addressed review feedback

commit 54bfed8debbe29784e0de7f07ebb277df68a4eb5
Author: Fabian Hueske 
Date:   2017-08-11T14:43:07Z

minor improvement

commit b0b24011e7c0444e3e1ebaba810edc06e9c85ad6
Author: twalthr 
Date:   2017-08-12T11:51:42Z

Efficient handling of rowtime timestamps




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---