[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224913#comment-16224913
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4894


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224884#comment-16224884
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
Merging


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224616#comment-16224616
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147661477
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 ---
@@ -38,23 +38,26 @@ import scala.collection.mutable
   * @param path The path to the CSV file.
   * @param fieldNames The names of the table fields.
   * @param fieldTypes The types of the table fields.
+  * @param selectedFields The fields which will be read and returned by 
the table source.
+  *   If None, all fields are returned.
   * @param fieldDelim The field delimiter, "," by default.
   * @param rowDelim The row delimiter, "\n" by default.
   * @param quoteCharacter An optional quote character for String values, 
null by default.
   * @param ignoreFirstLine Flag to ignore the first line, false by default.
   * @param ignoreComments An optional prefix to indicate comments, null by 
default.
   * @param lenient Flag to skip records with parse error instead to fail, 
false by default.
   */
-class CsvTableSource(
+class CsvTableSource private (
 private val path: String,
 private val fieldNames: Array[String],
 private val fieldTypes: Array[TypeInformation[_]],
-private val fieldDelim: String = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-private val quoteCharacter: Character = null,
-private val ignoreFirstLine: Boolean = false,
-private val ignoreComments: String = null,
-private val lenient: Boolean = false)
+private val selectedFields: Array[Int],
+private val fieldDelim: String,
+private val rowDelim: String,
+private val quoteCharacter: Character,
+private val ignoreFirstLine: Boolean,
+private val ignoreComments: String,
+private val lenient: Boolean)
   extends BatchTableSource[Row]
--- End diff --

The definition of schema and return type depends on the TableSource and the 
encoding. Many checks are done in the constructors of classes like 
`TableSchema` and `RowTypeInfo`. 

I'm currently working on a consistent builder pattern for table sources 
that can take care of this issue as well.



> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224613#comment-16224613
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147661037
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -56,33 +63,123 @@ class StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  tableSource
+  tableSource,
+  selectedFields
 )
   }
 
   override def copy(
   traitSet: RelTraitSet,
-  newTableSource: TableSource[_])
-: PhysicalTableSourceScan = {
+  newTableSource: TableSource[_]): PhysicalTableSourceScan = {
 
 new StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  newTableSource.asInstanceOf[StreamTableSource[_]]
+  newTableSource.asInstanceOf[StreamTableSource[_]],
+  selectedFields
 )
   }
 
   override def translateToPlan(
   tableEnv: StreamTableEnvironment,
   queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
+val fieldIndexes = TableSourceUtil.computeIndexMapping(
+  tableSource,
+  isStreamTable = true,
+  selectedFields)
+
 val config = tableEnv.getConfig
 val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-convertToInternalRow(
-  new RowSchema(getRowType),
+val outputSchema = new RowSchema(this.getRowType)
+
+// check that declared and actual type of table source DataStream are 
identical
+if (inputDataStream.getType != tableSource.getReturnType) {
+  throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
+s"returned a DataStream of type ${inputDataStream.getType} that 
does not match with the " +
+s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
+s"Please validate the implementation of the TableSource.")
+}
+
+// get expression to extract rowtime attribute
+val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+  tableSource,
+  selectedFields,
+  cluster,
+  tableEnv.getRelBuilder,
+  TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+)
+
+// ingest table and convert and extract time attributes if necessary
+val ingestedTable = convertToInternalRow(
+  outputSchema,
   inputDataStream,
-  new StreamTableSourceTable(tableSource),
-  config)
+  fieldIndexes,
+  config,
+  rowtimeExpression)
+
+// generate watermarks for rowtime indicator
+val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+  TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
+
+val withWatermarks = if (rowtimeDesc.isDefined) {
+  val rowtimeFieldIdx = 
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+  val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+  watermarkStrategy match {
+case p: PeriodicWatermarkAssigner =>
+  val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+case p: PunctuatedWatermarkAssigner =>
+  val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+  }
+} else {
+  // No need to generate watermarks if no rowtime attribute is 
specified.
+  ingestedTable
+}
+
+withWatermarks
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PeriodicWatermarkAssignerWrapper(
+timeFieldIdx: Int,
+assigner: PeriodicWatermarkAssigner)
+  extends AssignerWithPeriodicWatermarks[CRow] {
+
+  override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+  override def extractTimestamp(crow: CRow, previousElementTimestamp: 
Long): Long = {
+val timestamp: Long = 
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+assigner.nextTimestamp(timestamp)
+0L
--- End diff --

I think this does not really matter. We can always erase the timestamp 

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224583#comment-16224583
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147658004
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 ---
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.sql.Timestamp
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+import scala.collection.JavaConverters._
+
+/** Util class for [[TableSource]]. */
+object TableSourceUtil {
+
+  /** Returns true if the [[TableSource]] has a rowtime attribute. */
+  def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean =
+getRowtimeAttributes(tableSource).nonEmpty
+
+  /** Returns true if the [[TableSource]] has a proctime attribute. */
+  def hasProctimeAttribute(tableSource: TableSource[_]): Boolean =
+getProctimeAttributes(tableSource).nonEmpty
+
+  /**
+* Validates a TableSource.
+*
+* - checks that all fields of the schema can be resolved
+* - checks that resolved fields have the correct type
+* - checks that the time attributes are correctly configured.
+*
+* @param tableSource The [[TableSource]] for which the time attributes 
are checked.
+*/
+  def validateTableSource(tableSource: TableSource[_]): Unit = {
+
+val schema = tableSource.getTableSchema
+val tableFieldNames = schema.getColumnNames
+val tableFieldTypes = schema.getTypes
+
+// get rowtime and proctime attributes
+val rowtimeAttributes = getRowtimeAttributes(tableSource)
+val proctimeAttributes = getProctimeAttributes(tableSource)
+
+// validate that schema fields can be resolved to a return type field 
of correct type
+var mappedFieldCnt = 0
+tableFieldTypes.zip(tableFieldNames).foreach {
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
proctimeAttributes.contains(name) =>
+// OK, field was mapped to proctime attribute
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
rowtimeAttributes.contains(name) =>
+// OK, field was mapped to rowtime attribute
+  case (t: TypeInformation[_], name) =>
+// check if field is registered as time indicator
+if (getProctimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Processing time field '$name' 
has invalid type $t. " +
+s"Processing time attributes must be of type 
${Types.SQL_TIMESTAMP}.")
+}
+if (getRowtimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Rowtime field '$name' has 
invalid type $t. " +
+s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+}
+// check that field can be resolved in input type
+val (physicalName, _, tpe) 

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224581#comment-16224581
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147657884
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, 
ResolvedFieldReference}
+
+/**
+  * The [[FieldComputer]] interface returns an expression to compute the 
field of the table schema
+  * of a [[TableSource]] from one or more fields of the [[TableSource]]'s 
return type.
+  *
+  * @tparam T The result type of the provided expression.
+  */
+abstract class FieldComputer[T] {
--- End diff --

constant attributes can be passed via the constructor (see `ExistingField`).
For example you can define your example as 

```
class ParseStringField(field: String, format: String) extends 
TimestampExtractor
```

The format string is directly embedded into the generated expression.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224027#comment-16224027
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147574828
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 ---
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.sql.Timestamp
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.{TableException, Types, 
ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+import scala.collection.JavaConverters._
+
+/** Util class for [[TableSource]]. */
+object TableSourceUtil {
+
+  /** Returns true if the [[TableSource]] has a rowtime attribute. */
+  def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean =
+getRowtimeAttributes(tableSource).nonEmpty
+
+  /** Returns true if the [[TableSource]] has a proctime attribute. */
+  def hasProctimeAttribute(tableSource: TableSource[_]): Boolean =
+getProctimeAttributes(tableSource).nonEmpty
+
+  /**
+* Validates a TableSource.
+*
+* - checks that all fields of the schema can be resolved
+* - checks that resolved fields have the correct type
+* - checks that the time attributes are correctly configured.
+*
+* @param tableSource The [[TableSource]] for which the time attributes 
are checked.
+*/
+  def validateTableSource(tableSource: TableSource[_]): Unit = {
+
+val schema = tableSource.getTableSchema
+val tableFieldNames = schema.getColumnNames
+val tableFieldTypes = schema.getTypes
+
+// get rowtime and proctime attributes
+val rowtimeAttributes = getRowtimeAttributes(tableSource)
+val proctimeAttributes = getProctimeAttributes(tableSource)
+
+// validate that schema fields can be resolved to a return type field 
of correct type
+var mappedFieldCnt = 0
+tableFieldTypes.zip(tableFieldNames).foreach {
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
proctimeAttributes.contains(name) =>
+// OK, field was mapped to proctime attribute
+  case (t: SqlTimeTypeInfo[_], name: String)
+if t.getTypeClass == classOf[Timestamp] && 
rowtimeAttributes.contains(name) =>
+// OK, field was mapped to rowtime attribute
+  case (t: TypeInformation[_], name) =>
+// check if field is registered as time indicator
+if (getProctimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Processing time field '$name' 
has invalid type $t. " +
+s"Processing time attributes must be of type 
${Types.SQL_TIMESTAMP}.")
+}
+if (getRowtimeAttributes(tableSource).contains(name)) {
+  throw new ValidationException(s"Rowtime field '$name' has 
invalid type $t. " +
+s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+}
+// check that field can be resolved in input type
+val (physicalName, _, tpe) = 

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224025#comment-16224025
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147582469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 ---
@@ -38,23 +38,26 @@ import scala.collection.mutable
   * @param path The path to the CSV file.
   * @param fieldNames The names of the table fields.
   * @param fieldTypes The types of the table fields.
+  * @param selectedFields The fields which will be read and returned by 
the table source.
+  *   If None, all fields are returned.
   * @param fieldDelim The field delimiter, "," by default.
   * @param rowDelim The row delimiter, "\n" by default.
   * @param quoteCharacter An optional quote character for String values, 
null by default.
   * @param ignoreFirstLine Flag to ignore the first line, false by default.
   * @param ignoreComments An optional prefix to indicate comments, null by 
default.
   * @param lenient Flag to skip records with parse error instead to fail, 
false by default.
   */
-class CsvTableSource(
+class CsvTableSource private (
 private val path: String,
 private val fieldNames: Array[String],
 private val fieldTypes: Array[TypeInformation[_]],
-private val fieldDelim: String = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-private val quoteCharacter: Character = null,
-private val ignoreFirstLine: Boolean = false,
-private val ignoreComments: String = null,
-private val lenient: Boolean = false)
+private val selectedFields: Array[Int],
+private val fieldDelim: String,
+private val rowDelim: String,
+private val quoteCharacter: Character,
+private val ignoreFirstLine: Boolean,
+private val ignoreComments: String,
+private val lenient: Boolean)
   extends BatchTableSource[Row]
--- End diff --

Maybe we need a base class instead of traits to do something like checking 
the equality of numbers of field names/types.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224024#comment-16224024
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147583120
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, 
ResolvedFieldReference}
+
+/**
+  * The [[FieldComputer]] interface returns an expression to compute the 
field of the table schema
+  * of a [[TableSource]] from one or more fields of the [[TableSource]]'s 
return type.
+  *
+  * @tparam T The result type of the provided expression.
+  */
+abstract class FieldComputer[T] {
--- End diff --

I think we could add a `getArguments: Array[_]` function to allow providing 
extra arguments besides the existing fields, e.g., for situations in which 
event-time is represented as `String`, the argument should be the time pattern.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224026#comment-16224026
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147580191
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -56,33 +63,123 @@ class StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  tableSource
+  tableSource,
+  selectedFields
 )
   }
 
   override def copy(
   traitSet: RelTraitSet,
-  newTableSource: TableSource[_])
-: PhysicalTableSourceScan = {
+  newTableSource: TableSource[_]): PhysicalTableSourceScan = {
 
 new StreamTableSourceScan(
   cluster,
   traitSet,
   getTable,
-  newTableSource.asInstanceOf[StreamTableSource[_]]
+  newTableSource.asInstanceOf[StreamTableSource[_]],
+  selectedFields
 )
   }
 
   override def translateToPlan(
   tableEnv: StreamTableEnvironment,
   queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
+val fieldIndexes = TableSourceUtil.computeIndexMapping(
+  tableSource,
+  isStreamTable = true,
+  selectedFields)
+
 val config = tableEnv.getConfig
 val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-convertToInternalRow(
-  new RowSchema(getRowType),
+val outputSchema = new RowSchema(this.getRowType)
+
+// check that declared and actual type of table source DataStream are 
identical
+if (inputDataStream.getType != tableSource.getReturnType) {
+  throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
+s"returned a DataStream of type ${inputDataStream.getType} that 
does not match with the " +
+s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
+s"Please validate the implementation of the TableSource.")
+}
+
+// get expression to extract rowtime attribute
+val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+  tableSource,
+  selectedFields,
+  cluster,
+  tableEnv.getRelBuilder,
+  TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+)
+
+// ingest table and convert and extract time attributes if necessary
+val ingestedTable = convertToInternalRow(
+  outputSchema,
   inputDataStream,
-  new StreamTableSourceTable(tableSource),
-  config)
+  fieldIndexes,
+  config,
+  rowtimeExpression)
+
+// generate watermarks for rowtime indicator
+val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+  TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
+
+val withWatermarks = if (rowtimeDesc.isDefined) {
+  val rowtimeFieldIdx = 
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+  val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+  watermarkStrategy match {
+case p: PeriodicWatermarkAssigner =>
+  val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+case p: PunctuatedWatermarkAssigner =>
+  val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+  ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+  }
+} else {
+  // No need to generate watermarks if no rowtime attribute is 
specified.
+  ingestedTable
+}
+
+withWatermarks
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PeriodicWatermarkAssignerWrapper(
+timeFieldIdx: Int,
+assigner: PeriodicWatermarkAssigner)
+  extends AssignerWithPeriodicWatermarks[CRow] {
+
+  override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+  override def extractTimestamp(crow: CRow, previousElementTimestamp: 
Long): Long = {
+val timestamp: Long = 
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+assigner.nextTimestamp(timestamp)
+0L
--- End diff --

I know the timestamp in the `StreamRecord` is useless for the Table/SQL API 

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223885#comment-16223885
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
I'm planning to merge this PR before the feature freeze on Tuesday.

Cheers, Fabian


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222519#comment-16222519
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
Added documentation for the new TableSource interfaces.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222013#comment-16222013
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4894
  
Thanks for the feedback @twalthr.
I pushed an update.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221986#comment-16221986
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147358782
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -84,6 +86,13 @@
return typeInfo;
}
 
+   @Override
+   public TableSchema getTableSchema() {
+   return new TableSchema(
+   ((RowTypeInfo) typeInfo).getFieldNames(),
--- End diff --

What do you think about adding a `fromTypeInfo` method to the companion 
object that creates a `TableSchema`?


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221869#comment-16221869
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147339907
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 ---
@@ -18,55 +18,7 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.flink.api.common.functions.Function
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
-import org.apache.flink.types.Row
-
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan[T] {
-
-  /**
-* We check if the input type is exactly the same as the internal row 
type.
-* A conversion is necessary if types differ.
-*/
-  private[flink] def needsConversion(
-  externalTypeInfo: TypeInformation[Any],
-  internalTypeInfo: TypeInformation[T]): Boolean =
-externalTypeInfo != internalTypeInfo
-
-  private[flink] def generatedConversionFunction[F <: Function](
-  config: TableConfig,
-  functionClass: Class[F],
-  inputType: TypeInformation[Any],
-  expectedType: TypeInformation[Row],
-  conversionOperatorName: String,
-  fieldNames: Seq[String],
-  inputFieldMapping: Option[Array[Int]] = None)
-: GeneratedFunction[F, Row] = {
-
-val generator = new FunctionCodeGenerator(
-  config,
-  false,
-  inputType,
-  None,
-  inputFieldMapping)
-val conversion = 
generator.generateConverterResultExpression(expectedType, fieldNames)
-
-val body =
-  s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
-generator.generateFunction(
-  conversionOperatorName,
-  functionClass,
-  body,
-  expectedType)
-  }
-
-}
+trait CommonScan[T]
--- End diff --

I kept it for consistency. The other operators have a CommonX trait as well.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221867#comment-16221867
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147339820
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1332,6 +1322,11 @@ abstract class CodeGenerator(
 GeneratedExpression(resultTerm, NEVER_NULL, resultCode, 
SqlTimeTypeInfo.TIMESTAMP)
   }
 
+  private[flink] def generateCurrentTimestamp(): GeneratedExpression = {
+val rexBuilder = new RexBuilder(new FlinkTypeFactory(new 
FlinkTypeSystem))
+generateExpression(rexBuilder.makeCall(CURRENT_TIMESTAMP))
--- End diff --

Ah, thanks. I was looking for something like that.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221865#comment-16221865
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r147339574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging {
   } else {
 FlinkStatistic.UNKNOWN
   }
+
   convertedTableSource match {
-case s : StreamTableSource[_] => new StreamTableSourceTable(s, 
flinkStatistic)
-case _ => new TableSourceTable(convertedTableSource, 
flinkStatistic)
+case s: StreamTableSource[_] =>
--- End diff --

In that case a `StreamTableSourceTable` is registered. I kept the previous 
behavior. 
We can fix it in a follow up issue and make the decision based on the 
execution environment.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221812#comment-16221812
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146585641
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1332,6 +1322,11 @@ abstract class CodeGenerator(
 GeneratedExpression(resultTerm, NEVER_NULL, resultCode, 
SqlTimeTypeInfo.TIMESTAMP)
   }
 
+  private[flink] def generateCurrentTimestamp(): GeneratedExpression = {
+val rexBuilder = new RexBuilder(new FlinkTypeFactory(new 
FlinkTypeSystem))
+generateExpression(rexBuilder.makeCall(CURRENT_TIMESTAMP))
--- End diff --

Use `new CurrentTimePointCallGen().generate()` instead of create a whole 
new type environment ;)


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221813#comment-16221813
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146575110
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -89,34 +98,50 @@ public void addColumn(String family, String qualifier, 
Class clazz) {
 * @param charset Name of the charset to use.
 */
public void setCharset(String charset) {
-   this.schema.setCharset(charset);
+   this.hBaseSchema.setCharset(charset);
}
 
@Override
public TypeInformation getReturnType() {
-   String[] famNames = schema.getFamilyNames();
-   TypeInformation[] typeInfos = new 
TypeInformation[famNames.length];
+   return new RowTypeInfo(getFieldTypes(), getFieldNames());
+   }
+
+   public TableSchema getTableSchema() {
--- End diff --

Add `@Override` annotation.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221811#comment-16221811
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146583860
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -246,40 +245,31 @@ abstract class CodeGenerator(
 */
   def generateConverterResultExpression(
   returnType: TypeInformation[_ <: Any],
-  resultFieldNames: Seq[String])
+  resultFieldNames: Seq[String],
+  rowtimeExpression: Option[RexNode] = None)
--- End diff --

Add the Scala doc for the parameter.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221808#comment-16221808
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146576347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
 ---
@@ -50,6 +50,11 @@ class TableSchema(
 
   val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap
 
+  /** Returns a copy of the TableSchema */
+  def copy: TableSchema = {
--- End diff --

Maybe mention the deep copy behavior.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221805#comment-16221805
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146587166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
@@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster, 
RelOptTable, RelTraitSet}
 import org.apache.calcite.rel.RelWriter
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
 import scala.collection.JavaConverters._
 
 abstract class PhysicalTableSourceScan(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
 table: RelOptTable,
-val tableSource: TableSource[_])
+val tableSource: TableSource[_],
+val selectedFields: Option[Array[Int]])
   extends TableScan(cluster, traitSet, table) {
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-flinkTypeFactory.buildLogicalRowType(
-  TableEnvironment.getFieldNames(tableSource),
-  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
+  case _: StreamTableSourceTable[_] => true
+  case _: BatchTableSourceTable[_] => false
+  case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+}
+
+TableSourceUtil.getTableSchema(tableSource, selectedFields, 
streamingTable, flinkTypeFactory)
--- End diff --

I would rename this method to `getRelDataType`. Because it does not return 
our `TableSchema`.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221810#comment-16221810
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146582532
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging {
   } else {
 FlinkStatistic.UNKNOWN
   }
+
   convertedTableSource match {
-case s : StreamTableSource[_] => new StreamTableSourceTable(s, 
flinkStatistic)
-case _ => new TableSourceTable(convertedTableSource, 
flinkStatistic)
+case s: StreamTableSource[_] =>
--- End diff --

What happens if the table source implements both interfaces?


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221807#comment-16221807
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146584051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -246,40 +245,31 @@ abstract class CodeGenerator(
 */
   def generateConverterResultExpression(
   returnType: TypeInformation[_ <: Any],
-  resultFieldNames: Seq[String])
+  resultFieldNames: Seq[String],
+  rowtimeExpression: Option[RexNode] = None)
 : GeneratedExpression = {
 
 val input1AccessExprs = input1Mapping.map {
-  case TimeIndicatorTypeInfo.ROWTIME_MARKER =>
-// attribute is a rowtime indicator. Access event-time timestamp 
in StreamRecord.
-generateRowtimeAccess()
-  case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
+  case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
+   TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
+// attribute is a rowtime indicator.
+if (rowtimeExpression.isDefined) {
--- End diff --

We could use pattern matching here.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221806#comment-16221806
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146574319
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -84,6 +86,13 @@
return typeInfo;
}
 
+   @Override
+   public TableSchema getTableSchema() {
+   return new TableSchema(
+   ((RowTypeInfo) typeInfo).getFieldNames(),
--- End diff --

Maybe we should add a constructor or method to `TableSchema` that takes a 
TypeInfo. I think this case is very common for table sources.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221809#comment-16221809
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4894#discussion_r146586028
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 ---
@@ -18,55 +18,7 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.flink.api.common.functions.Function
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
-import org.apache.flink.types.Row
-
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan[T] {
-
-  /**
-* We check if the input type is exactly the same as the internal row 
type.
-* A conversion is necessary if types differ.
-*/
-  private[flink] def needsConversion(
-  externalTypeInfo: TypeInformation[Any],
-  internalTypeInfo: TypeInformation[T]): Boolean =
-externalTypeInfo != internalTypeInfo
-
-  private[flink] def generatedConversionFunction[F <: Function](
-  config: TableConfig,
-  functionClass: Class[F],
-  inputType: TypeInformation[Any],
-  expectedType: TypeInformation[Row],
-  conversionOperatorName: String,
-  fieldNames: Seq[String],
-  inputFieldMapping: Option[Array[Int]] = None)
-: GeneratedFunction[F, Row] = {
-
-val generator = new FunctionCodeGenerator(
-  config,
-  false,
-  inputType,
-  None,
-  inputFieldMapping)
-val conversion = 
generator.generateConverterResultExpression(expectedType, fieldNames)
-
-val body =
-  s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
-generator.generateFunction(
-  conversionOperatorName,
-  functionClass,
-  body,
-  expectedType)
-  }
-
-}
+trait CommonScan[T]
--- End diff --

Do we still need this trait if it does not contain anything? Let's keep it 
simple and remove it for now.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216777#comment-16216777
 ] 

ASF GitHub Bot commented on FLINK-7548:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4894

[FLINK-7548] [table] Improve rowtime support of TableSources.

## What is the purpose of the change

This PR refactors the `TableSource` interface and some related interfaces. 
The goal of the refactoring is to:

1. Move timestamp extraction and watermark assignment into the table scan. 
This way it is under control of the Table API and does not rely extraction and 
assignment in the `TableSource`
2. Unify time attribute handling in `BatchTableSource` and 
`StreamTableSource`.
3. Projection push down for table sources with time attributes.
4. Projection push down into table access.
5. Support of time attributes for table source with atomic return type.

This commit also resolves:
- FLINK-6870: Unified handling of time attributes in batch and streaming
- FLINK-7179: Support for projection push down and watermark assigners
- FLINK-7696: Support for projection push down and time attributes

## Brief change log

### API changes

* Added `getTableSchema()` method to `TableSource` to separate logical 
table schema from physical return type (`getReturnType()`).
* Replaced `DefinedFieldNames` interface by `DefinedFieldMapping` 
interface. Field names can be defined by table schema. The 
`DefinedFieldMapping` interfaces allows to provide an explicit mapping from 
table schema fields to return type fields.
* `DefinedRowtimeAttribute` interface was replaced by 
`DefinedRowtimeAttributes` interface. The new interface returns a list of 
`RowtimeAttributeDescriptor` which exposes a `TimestampExtractor` and 
`WatermarkStrategy`. Both are used internally to extract timestamps and assign 
watermarks during the table scan. There are default implementation for common 
timestamp extraction and watermark generation methods.

## Verifying this change

This PR adds an extensive set of validation, plan, and end-to-end 
integration tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **YES**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation

  - Does this pull request introduce a new feature? **YES**
  - If yes, how is the feature documented? **not yet**, PR will be updated 
soon.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableWatermarks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4894.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4894


commit 5a3b0e02b58b0b5eeece60514b9affaf738c10b5
Author: Fabian Hueske 
Date:   2017-10-05T15:44:35Z

[FLINK-7548] [table] Improve rowtime support of TableSources.

This commit also resolves:
- FLINK-6870: Unified handling of time attributes in batch and streaming
- FLINK-7179: Support for projection push down and watermark assigners
- FLINK-7696: Support for projection push down and time attributes

TODO:
- update documentation




> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215805#comment-16215805
 ] 

Fabian Hueske commented on FLINK-7548:
--

Thanks for the feedback [~wheat9].

I've addressed the issue of projection push down in my WIP branch and think 
I've found a good solution. 
A {{ProjectableTableSource}} only needs to adjust the {{TypeInformation}} and 
the {{DataStream}} of {{DataSet}}. The {{TableSchema}} does not need to be 
adapted. I've also extended the support for projection push down by pushing 
projection into a table scan even if the {{TableSource}} does not implement 
{{ProjectableTableSource}}. In that case, the projection is applied by the 
initial {{Row}} conversion such that a projecting Calc can be removed if only 
selects or reorders fields. It would also be possible to push expressions into 
the scan, but that should be a follow up issue.

I'll open a PR in the next hours. It will be quite large (~2500 LOC changes) 
but it would be great if I could get some feedback, especially for the API 
facing classes.

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-23 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215758#comment-16215758
 ] 

Haohui Mai commented on FLINK-7548:
---

Sorry for the late response. The APIs should work for our use cases as long as 
the timestamps can be extracted through an expression.

I think [~ykt836] brought up a good point -- it might be tricky to implement 
projection push down in this case. What would be our strategies there?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-18 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209327#comment-16209327
 ] 

Kurt Young commented on FLINK-7548:
---

Actually I was thinking that when projections are pushed down, we should keep 
the TableSource's schema unchanged. For example, when we want to read a csv 
file, the schema of the file is somewhat fixed. Whether we want to read all 
fields or some selected fields from that file will not affect the schema of the 
table. I think we should keep the schema fixed and change the result type after 
projection push down. 

But this depends on how we see or define what the {{schema}} of a 
{{TableSource}}. If the schema changes with field projections, it makes no 
differences with current result type inference. What do you think?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209299#comment-16209299
 ] 

Fabian Hueske commented on FLINK-7548:
--

Thanks for the comment [~ykt836]. 

I think the projection push-down behavior is similar as before. When a 
projection is pushed down, a {{TableSource}} is not modified but we create a 
new {{TableSource}} with reduced schema. This is what happens at the moment as 
well. The difference is that right now the schema is inferred by the Table API 
/ SQL internals from the return type and with my change the schema would be 
explicitly given by the {{TableSource}}. This makes it easier to support 
projection push-down also for {{TableSource}} implementations with time fields 
because these would no longer be automatically merged into the schema by the 
Table API internals but instead the {{TableSource}} knows everything about its 
schema.

Nonetheless, you are right. When projections are pushed down, a {{TableSource}} 
needs generate a new return type and new schema for the new {{TableSource}} 
(and possibly adjust the rowtime / proctime field information).

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-18 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209188#comment-16209188
 ] 

Kurt Young commented on FLINK-7548:
---

Hi [~fhueske], sorry for the late response. I'm not very familiar with 
streaming related stuffs, but I like the idea to separate the schema and actual 
returned type, it makes the concepts more clear. One question here, do you 
think table schema should be remain immutable once we defined it. Like a 
CsvTableSource, IMO the schema should be fixed once we decide which file to 
read. But according to your WIP branch, if project pushed down into 
CsvTableSource, it also changes the table schema. I'm not sure this is correct. 
What do you think?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-18 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209002#comment-16209002
 ] 

Till Rohrmann commented on FLINK-7548:
--

Is this strictly a blocker for 1.4?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208238#comment-16208238
 ] 

Fabian Hueske commented on FLINK-7548:
--

Thanks for the comments [~twalthr] and [~xccui].

@ Timo: I would not add {{TimestampExtractor}} and {{WatermarkAssigner}} to 
{{TableSchema}}. {{TableSchema}} is also used at different places and timestamp 
extractors & watermarks assigners are not relevant in these contexts. IMO, 
{{TimestampExtractor}} and {{WatermarkAssigner}} belong to a {{TableSource}} 
but not to the schema. The schema could encode that these fields are 
non-nullable and watermarked (or however, we want to call this property). I 
think the issue of defining fields twice is rather an issue of a concrete 
{{TableSource}} implementation but not so much of the internal API. Given the 
current API, it is possible to have a {{TableSource}} that requires to define a 
field just once (in whatever way) and generate an appropriate TableSchema and 
proctime field name.

@ Xingcan: 
1. With the current design it shouldn't be too hard to define a 
{{TimestampExtractor}} that parses a string field. I would not add it to the 
{{ExistingField}} extractor because it would need an additional parameter that 
specifies the formatting of the timestamp string.
2. Multiple rowtime attributes might not be possible yet but might be later. 
With this API we have the option to add this feature later without breaking the 
API.
3. It's also possible to add a {{TimestampExtractor}} that assigns the current 
timestamp as event timestamp, i.e., ingestion time.
4. Yes, that was my initial motivation to use RexNode instead of Table API 
Expressions. I was thinking that it would be easier to parse SQL expressions 
from a CREATE TABLE statement into RexNodes (using some Calcite code) than to 
Expressions. OTOH, I think it makes sense to be consistent and not expose 
Calcite API. I'd be happy to hear what others think about the RexNode vs. 
Expression choice.

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-17 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207782#comment-16207782
 ] 

Xingcan Cui commented on FLINK-7548:


Hi [~fhueske], sorry for the late reply. I'll first share some ideas that 
struck me and then keep the focus on this issue.
# Now that the rowtime can be extracted by expressions, shall we consider 
allowing more data types for the original fields? For instance, the users may 
want to extract the timestamps from a {{String}} field or even from multiple 
fields by a UDF. 
# Since the stream API do not support multiple watermarks, maybe it's not 
necessary to define a list of {{RowtimeAttributeDescriptor}}.
# Sometimes the processing time could be inapplicable, while the streams are 
lack of timestamps. I think it would be convenient to provide an inner 
mechanism, which can materialize the ingestion time as the rowtime.
# Using Table API Expression sounds like a good idea. However, I just wonder 
whether it will make the DDL harder to implement.

Best, Xingcan

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-17 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207717#comment-16207717
 ] 

Timo Walther commented on FLINK-7548:
-

Thanks for the proposal [~fhueske]. I like the idea with a separate 
{{TableSchema}} for sources that a user can implement. Maybe we can simplify 
the design even more by extending {{TableSchema}} with additional methods like 
{{setRowtime(TimestampExtractor, WatermarkAssigner), setProctime()}}. Then, we 
would not need the {{DefinedProctimeAttribute}} and 
{{DefinedRowtimeAttributes}} interfaces anymore. If we want to change the logic 
in the future, we don't have to modify interfaces but can deprecate or add 
additional methods to {{TableSchema}}. Having {{setRowtime/setProctime()}} 
would also hide the internal {{TimeIndicatorType}} but would not require to 
define a field name twice (in TableSchema and DefinedProctimeAttribute). What 
do you think?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207628#comment-16207628
 ] 

Fabian Hueske commented on FLINK-7548:
--

Hi [~jark], [~wheat9], [~twalthr], [~xccui], [~ykt836],I'd really like your 
feedback on this effort.
As recently discussed on the dev mailing list, the plan is to cut the 1.4.0 
release branch in two weeks and I think this change should go in.

In the meantime, I thought about the interface and I'll do some refinements:
- Time indicator attributes will be of type Types.SQL_TIMESTAMP in the 
TableSource (instead of TimeIndicatorType). The reason is that 1) they are 
exposed as TIMESTAMP in the schema 2) this hides the internal representation 
from the user.
- I add a DefinedProctimeAttribute interface that returns the name of a 
processing time stamp, i.e., the field must exist in the TableSchema and will 
be used as processing time indicator
- The DefinedRowtimeAttributes will be changed to return a list of 
RowtimeAttributeDescriptors. The field names of the descriptors are matched to 
the TableSchema and these fields must be of type Timestamp.
- The rowtime extraction expression will be a Flink Table API Expression 
instead of a Calcite RexNode. Until now we have hidden Calcite interfaces from 
the users.
- The BatchTableSourceScan will be adapted to also extract rowtime attributes 
and assign the current timestamp to processing time attributes. This way, we 
have a unified handling if a TableSource implements BatchTableSource and 
StreamTableSource.

Please let me know what you think.
Thanks, Fabian

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-10-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197673#comment-16197673
 ] 

Fabian Hueske commented on FLINK-7548:
--

I've created a proposal for a reworked TableSource API that solves several 
issues. I've pushed a WIP branch to my repository: 
https://github.com/fhueske/flink/tree/tableWatermarks-extended

The main changes are as follows:
- The {{TableSource}} interface gets a new method {{getTableSchema(): 
TableSchema}} which returns the schema with all fields of the resulting table. 
For streaming tables, this includes also rowtime indicators. These fields have 
the corresponding {{TimeIndicatorTypeInformation}} types. This means, that the 
{{TypeInformation}} returned by {{getResultType()}} does no longer determine 
the schema and fields of the table. Hence, we need to perform a mapping from 
the physical input type ({{getResultType()}} and the logical table schema 
{{getTableSchema()}}. By default, this mapping is done based on field names, 
i.e., a table schema field is mapped to the field of the same name in the 
physical input type. If we cannot find a physical field for a logical field or 
if the types do not match, the {{TableSource}} is rejected. This default 
behavior resembles the current behavior. If fields should not be automatically 
mapped by name, we will allow to specify a manual index based mapping (similar 
to the previous {{DefinedFieldNames}} interface (note: this is not yet included 
in the WIP branch).
- Processing time fields (i.e., fields in the table schema with type 
{{TimeIndicatorTypeInformation(false)}}) are automatically inserted into the 
schema during the initial conversion.
- A {{StreamTableSource}} with rowtime fields (i.e., fields in the table schema 
with type {{TimeIndicatorTypeInformation(true)}}) requires an additional 
interface {{DefinedRowtimeAttributes}}. The interface provides a 
{{RowtimeAttributeDescriptor}} for each rowtime field (right now only a single 
rowtime field is supported but we are prepared for more). The 
{{RowtimeAttributeDescriptor}} provides a {{TimestampExtractor}} which gives a 
{{RexNode}} expression to extract the timestamp field from the input type. A 
corresponding expression is code-gen'd into the initial conversion and executed 
by the table scan operator. Moreover, {{RowtimeAttributeDescriptor}} gives a 
{{WatermarkStrategy}} which is used to generate watermarks for a rowtime 
attribute. Watermarks are also generated by the table scan operator.
- We can provided built-in implementations for {{TimestampExtractor}} (right 
now only {{ExistingField}} to convert a {{Long}} or {{Timestamp}} attributes 
into a rowtime attribute) and {{WatermarkStrategy}} (right now 
{{AscendingWatermarks}} and {{BoundedOutOfOrderWatermarks}}). Additional 
implementations can be added in later PRs.

I think the proposal is a good solution because:
- the table schema is explicitly defined by the {{TableSource}} and not by the 
API internals as before (handling of time attributes is currently hidden). This 
will make {{CREATE TABLE}} DDL statements easier.
- timestamp extraction and watermark assignment can be configured by extensible 
interfaces without changing the API internals. The actual timestamp extraction 
and watermark generation happen in the table scan operator, so the 
{{TableSource}} does not have to deal with it.
- Timestamp extraction happens with via {{RexNode}} expressions. So this is 
very versatile and it should be possible to call UDFs if these had been 
registered before (need to check this though).
- We can provide built-in implementations for the most common strategies. I 
think all requirements that we had listed in this issue before, should be 
solvable with this design.
- Projection push-down can be enabled for table sources with timestamp 
attributes which doesn't work right now (not part of the WIP branch)
- I could remove a few weird artifacts / workarounds of the current design that 
changes the tables schema internally.

Of course, there are also a downside:
- We are touching the {{TableSource}} interface and related interfaces. This is 
not a big deal for Flink's built-in sources (because there are not so many), 
but might be for users that have more {{TableSource}}s implemented.

Please let me know what you think [~jark], [~wheat9], [~twalthr], [~xccui], 
[~ykt836], and everybody else.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> 

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-26 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181847#comment-16181847
 ] 

Xingcan Cui commented on FLINK-7548:


Thanks for the explanation, [~fhueske]. I was too paranoid about the problem.

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-26 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181234#comment-16181234
 ] 

Fabian Hueske commented on FLINK-7548:
--

I think there might be a misunderstanding of the 
{{TimestampsAndPeriodicWatermarksOperator}}.

bq. About my last question, I actually refer to the 
TimestampsAndPeriodicWatermarksOperator. Here, the "periodic" refers to 
proctime. Considering the time systems for the rowtime and the proctime may not 
be synchronized (i.e., they get different speeds), could we consider providing 
a "rowtime periodic" assigner?

The assigner does not emit watermarks that have a processing time _value_ but 
only in regular time intervals that are based on processing time.
So, whenever the interval has passed, the operator asks the 
{{AssignerWithPeriodicWatermarks}} to return the current watermark. The 
watermark should based on the event-time timestamps that the 
{{AssignerWithPeriodicWatermarks}} observed (in fact this depends on the 
implementation of the assigner). So the value of the watermarks is based on 
event-time but the interval in which the watermarks are generated is not. 
AFAIK, this is the most commonly used watermark generation strategy. So it 
seems to suit many users.

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-25 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180077#comment-16180077
 ] 

Xingcan Cui commented on FLINK-7548:


Thanks for the answers, [~fhueske]. Some extra comments.

About the first question, we must admit that a rowtime field is dualistic. On 
one hand, it represents the rowtime and should be  taken as the {{Long}} type. 
On the other hand, it is a common field that gets its own type ({{Timestamp}} 
or {{Long}} and maybe more in the future?). We don't want to perform the extra 
type judgement when using this field as the rowtime field and also don't want 
to lose the original data type when using it as a common field (e.g., be passed 
to a UDF which formats a timestamp). Of course, if all the types are internally 
represented as {{Long}}, we just give the fields different time indicators so 
that we could recover the original data type after the processing.

The record number bounded out-of-order generation strategy is something like we 
don't emit a watermark {{w}} until a specified number of records whose 
timestamps are greater than {{w}} have reached. Just an idea that hits me :P

About my last question, I actually refer to the 
{{TimestampsAndPeriodicWatermarksOperator}}. Here, the "periodic" refers to 
proctime. Considering the time systems for the rowtime and the proctime may not 
be synchronized (i.e., they get different speeds), could we consider providing 
a "rowtime periodic" assigner?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-25 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179836#comment-16179836
 ] 

Fabian Hueske commented on FLINK-7548:
--

Thanks for your thoughts [~xccui].
I added a few comments to your suggestions / questions.

Thanks, Fabian

bq. Considering that the data type should be preserved, it may bring extra 
logic if we do that internally. To keep the consistency, I wonder if it's 
possible to encapsulate the time into a new Rowtime type. It exposes two 
methods, getTime(): Long for logical level use and getValue(): T for physical 
level use.

In fact, {{Long}} and {{Timestamp}} have the same internal representation, 
namely {{Long}}. The issue is more the type that is exposed to SQL or the Table 
API. We would need a new TimeIndicator type that exposes a timestamp as 
{{Long}}.

bq. Besides, I think the watermark generation should not be bound with rowtime 
extraction. Compared with implementing them in a single scan operator (not sure 
if I understood correctly), I prefer to generate watermarks in extra operators. 
That should be more flexible.

Timestamp extraction and watermark generation would not be tight together. 
First, we would compute timestamps (only necessary if we don't use an existing 
field). The next step would extract watermarks. However, both operations would 
happen in the logical scan operator because a single operator can be translated 
into multiple DataStream operations.

bq. I am thinking of a new record number bounded out-of-order generation 
strategy. Do you think it will be useful in real applications?

How would this strategy work? IMO, built-in strategies should have a concrete 
use case in mind which is common enough to justify a built-in primitive.

bq. I still feel that the machine time is not compatible with the rowtime 
watermark generation. Shall we consider getting rid of it?

Machine time (assuming that you refer to processing time here) does not use 
watermarks. Watermarks are only used for event-time processing. 

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-22 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176020#comment-16176020
 ] 

Xingcan Cui commented on FLINK-7548:


Hi [~fhueske] and [~jark], sorry for the late reply.

Honestly, I don't have much experience in real application, but I think the 
proposal is quite reasonable. In brief, any (existing or computed) field with 
the type {{LONG}} or {{TIMESTAMP}} can be taken as a rowtime field, right? Here 
are my thoughts:

1. Considering that the data type should be preserved, it may bring extra logic 
if we do that internally. To keep the consistency, I wonder if it's possible to 
encapsulate the time into a new {{Rowtime}} type. It exposes two methods, 
{{getTime(): Long}} for logical level use and {{getValue(): T}} for physical 
level use.

2. Besides, I think the watermark generation should not be bound with rowtime 
extraction. Compared with implementing them in a single scan operator (not sure 
if I understood correctly), I prefer to generate watermarks in extra operators. 
That should be more flexible.

3. I am thinking of a new record number bounded out-of-order generation 
strategy. Do you think it will be useful in real applications?

4. I still feel that the machine time is not compatible with the rowtime 
watermark generation. Shall we consider getting rid of it?

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174581#comment-16174581
 ] 

Fabian Hueske commented on FLINK-7548:
--

Thanks for the feedback [~jark]!

bq. Regarding to the existing field type, should we support Timestamp type? 
Timestamp is a very common type in traditional database.

You mean to support {{TIMESTAMP}} in addition to {{LONG}} as existing timestamp 
column? I think that's a good idea. I will address that in FLINK-7446.

bq. What about to make TableSource support "Computed Column" instead of UDF 
converter? 

Initially, I was also thinking about arbitrary expressions for column 
conversions. The "problem" would be that {{TableSource}} (and the 
{{DefinedRowtimeAttribute}} interface) are shared among Table API and SQL, but 
Table API and SQL expressions are different. 
The easiest would be to use the Table API expression parser but this would be 
inconsistent with SQL. I proposed using a UDF because this would avoid the 
problem at the cost of implementing a UDF for each conversion. I'm definitely 
open for suggestions here.

bq. I agree to preserve the type.

Yes, I totally see that. I'd solve this in a follow up issue because I think 
we'd need a different time indicator type for that and some changes would be 
more involved.


> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-19 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171634#comment-16171634
 ] 

Jark Wu commented on FLINK-7548:


Hi [~fhueske], thanks for the proposal. I really like it ! It meets most of our 
requirement.  I have some additional thoughts: 

Regarding to the existing field type, should we support {{Timestamp}} type?  
Timestamp is a very common type in traditional database.

The computed timestamp is a very useful feature. We implemented it in Blink SQL 
which is called "Computed Column", it is a new field which is computed based on 
one or more existing fields. The computed column can be used as timestamp field 
but also a normal field. The computed column is computed using an expression 
(including UDF).  What about to make TableSource support "Computed Column" 
instead of UDF converter?  

{quote}
preserve type of time indicator fields. At the moment, a field of type 
ROWTIME_INDICATOR is exposed to the user as TIMESTAMP. However, a time field is 
initially be of type LONG such that the type exposed to the user is changed. We 
might want to add a time indicator that exposes itself as LONG.
{quote}
I agree to preserve the type. It is very strange that a long type field becomes 
{{Timestamp}} type when treat the field as timestamp field.

What do you think about this  [~fhueske], [~xccui], [~wheat9] ? 

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171469#comment-16171469
 ] 

Fabian Hueske commented on FLINK-7548:
--

At the moment, we expect a {{TableSource}} to assign timestamps and watermarks 
itself.
The goal for this JIRA should be to move the timestamp and watermark generation 
into the Scan operator.
So the Scan operator would receive the {{DataStream}} from the {{TableSource}} 
that has no ({{StreamRecord}}) timestamps and watermarks and generate 
watermarks (and timestamps) based on that.

For this we need to provide information about the timestamp field and the 
generation of watermarks.
I collected a few requirements from discussions I had:

1. the timestamp field can be defined as one of the following:
  - An existing field of type {{LONG}}. The type of the existing field is 
converted to {{ROWTIME_INDICATOR}} (this conversion is only on the logical 
level, as timestamps are internally handled as LONG).
  - An existing field that must be converted using a scalar UDF with a single 
input parameter (of type of the existing field) that returns a {{LONG}}. The 
input field is replaced by the result of the UDF. The name of the input field 
is preserved and the logical type of the field becomes {{ROWTIME_INDICATOR}}.
  - A new field that is computed using a scalar UDF with one or more input 
parameters. The result of the UDF is of type {{LONG}} and appended as a new 
field to the schema of the row. The name of the new field must be provided and 
the logical type is {{ROWTIME_INDICATOR}}.

2. the watermarks are computed based on the (computed) timestamp field.
  - There are some common watermark strategies that should be supported:
 -- (periodic) ascending watermarks
 -- (periodic) bounded out-of-order watermarks
  - Custom watermark logic should be supported as well
 -- periodic watermarks
 -- punctuated watermarks
 
Additional thoughts / future requirements / open questions: 
- preserve type of time indicator fields. At the moment, a field of type 
{{ROWTIME_INDICATOR}} is exposed to the user as {{TIMESTAMP}}. However, a time 
field is initially be of type {{LONG}} such that the type exposed to the user 
is changed. We might want to add a time indicator that exposes itself as 
{{LONG}}.
- do we need support for time indicator fields in nested fields (with 
replacement).

What do you think of this [~wheat9], [~jark], [~xccui]? Would that meet your 
requirement?

Once we agree on the requirements (and scope) of this issue, we can continue to 
define the interface.

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-08-29 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144836#comment-16144836
 ] 

Xingcan Cui commented on FLINK-7548:


Thanks for the issue [~jark]. I'd like to share some ideas about the problem.
1. In principle, each rowtime field should be "guarded" with a set of 
watermarks. Although we support multiple rowtime fields now, once two streams 
are connected, their watermarks will be forcibly merged. As a consequence, the 
initial watermarks may not be used in the following calculations. Shall we 
consider re-generating them?
2. The current periodic watermark assigner is based on machine time. I'm not 
sure if it is applicable for rowtime since the rowtime and machine time may not 
be synchronized. For example, if the stream is sourced from a historical queue, 
it may feed into the system at a maximum speed, thus the machine time based 
watermark assigner may not work properly (e.g., we may generate a watermark 
with 1 hour rowtime span in 5 seconds). How about using a rowtime based 
periodic assigner with the following framework?
{code:java}
class WatermarksAssigner(interval: Long) extends 
AssignerWithPunctuatedWatermarks[Order] {
  var currentWatermark: Long = 0

  override def extractTimestamp(element: Order, previousElementTimestamp: 
Long): Long = {
element.rt
  }

  override def checkAndGetNextWatermark(lastElement: Order, extractedTimestamp: 
Long): Watermark = {
if (currentWatermark >= lastWatermark + interval) {
  currentWatermark = currentWatermark + ((extractedTimestamp - 
lastWatermarks) / interval) * 
interval
  new Watermark(currentWatermark)
} else {
  null
}
  }
}
{code}
BTW, I'm quite interested in this issue. Can I take it?

Thanks, Xingcan



> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)