[GitHub] flink issue #4674: [FLINK-7627] [table] SingleElementIterable should impleme...

2017-11-20 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4674
  
I'm +1 with the first option, i.e. declare the element field as 
`transient`.  You are right, the `SingleElementIterable` is not necessary to be 
serialized.


---


[GitHub] flink issue #4936: [FLINK-7962] Add built-in support for min/max aggregation...

2017-11-15 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4936
  
I will help and merge this .


---


[GitHub] flink issue #4936: [FLINK-7962] Add built-in support for min/max aggregation...

2017-11-13 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4936
  
Thanks for the work!  The code looks good to me. 

+1 


---


[GitHub] flink issue #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribute to wo...

2017-09-27 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4710
  
Looks good to me.

+1


---


[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

2017-09-21 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4536
  
Hi @fhueske , I have fixed the conflicts and update the docs to describe 
the var-arg support.


---


[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

2017-09-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4536#discussion_r140163581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
 ---
@@ -40,17 +40,14 @@ class TableSqlFunction(
 name: String,
 udtf: TableFunction[_],
 rowTypeInfo: TypeInformation[_],
-returnTypeInference: SqlReturnTypeInference,
-operandTypeInference: SqlOperandTypeInference,
-operandTypeChecker: SqlOperandTypeChecker,
-paramTypes: util.List[RelDataType],
+typeFactory: FlinkTypeFactory,
 functionImpl: FlinkTableFunctionImpl[_])
   extends SqlUserDefinedTableFunction(
 new SqlIdentifier(name, SqlParserPos.ZERO),
-returnTypeInference,
-operandTypeInference,
-operandTypeChecker,
-paramTypes,
+ReturnTypes.CURSOR,
+createOperandTypeInference(name, udtf, typeFactory),
+createOperandTypeChecker(name, udtf),
+null,
--- End diff --

I think it is fine. `ScalarSqlFunction` and `AggregateSqlFunction` also 
pass `null` and works good. 


---


[GitHub] flink issue #4635: [FLINK-7571] [table] Fix translation of TableSource with ...

2017-09-20 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4635
  
Hi @fhueske , thanks for the work! It is a very important fix! I'm fine 
with the changes.

+1 to merge


---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-31 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136363886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

I agree with @lincoln-lil , we can define an abstract method in 
`TableEnvironment`  and implement it in `StreamTableEnvironment` which do not 
need to cast QueryConfig to StreamQueryConfig manually.

```
def queryConfig: StreamQueryConfig = new StreamQueryConfig
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-31 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r136362552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -106,6 +106,43 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
--- End diff --

I think a pre-registered table sink should contain its own schema 
information, so I prefer `registerTableSink(String, TableSink)` interface. If 
we register a table sink without schema in it, we can throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4629: [hotfix][table] Fix bug of testAggregateFunctionOperandTy...

2017-08-31 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4629
  
Thanks for the hotfix, will merge this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4624: [FLINK-7410] [table] Use toString method to display opera...

2017-08-31 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4624
  
The CI failed but is not related to this PR.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...

2017-08-31 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r136308757
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 ---
@@ -41,7 +41,7 @@ abstract class UserDefinedFunction extends Serializable {
   def close(): Unit = {}
 
   /**
-* @return true iff a call to this function is guaranteed to always 
return
+* @return true if a call to this function is guaranteed to always 
return
--- End diff --

I think `iff` is correct. `iff` is means "if and only if".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...

2017-08-31 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r136308925
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -268,6 +269,7 @@ object UserDefinedFunctionUtils {
 */
   def createTableSqlFunctions(
   name: String,
+  disPlayName: String,
--- End diff --

disPlayName  -> displayName


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

2017-08-31 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4536
  
Hi @sunjincheng121  I have updated the PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

2017-08-31 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4536#discussion_r136287683
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
 ---
@@ -74,48 +75,102 @@ class TableSqlFunction(
 
 object TableSqlFunction {
 
-  /**
-* Util function to create a [[TableSqlFunction]].
-*
-* @param name function name (used by SQL parser)
-* @param udtf user-defined table function to be called
-* @param rowTypeInfo the row type information generated by the table 
function
-* @param typeFactory type factory for converting Flink's between 
Calcite's types
-* @param functionImpl Calcite table function schema
-* @return [[TableSqlFunction]]
-*/
-  def apply(
+  private[flink] def createOperandTypeInference(
 name: String,
 udtf: TableFunction[_],
-rowTypeInfo: TypeInformation[_],
-typeFactory: FlinkTypeFactory,
-functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
-
-val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
-val typeFamilies: util.List[SqlTypeFamily] = new 
util.ArrayList[SqlTypeFamily]
-// derives operands' data types and type families
-functionImpl.getParameters.asScala.foreach{ o =>
-  val relType: RelDataType = o.getType(typeFactory)
-  argTypes.add(relType)
-  typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, 
SqlTypeFamily.ANY))
+typeFactory: FlinkTypeFactory)
+  : SqlOperandTypeInference = {
+/**
+  * Operand type inference based on [[TableFunction]] given 
information.
+  */
+new SqlOperandTypeInference {
+  override def inferOperandTypes(
+  callBinding: SqlCallBinding,
+  returnType: RelDataType,
+  operandTypes: Array[RelDataType]): Unit = {
+
+val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
+  .getOrElse(throw new ValidationException(
+s"Given parameters of function '$name' do not match any 
signature. \n" +
+  s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+  s"Expected: ${signaturesToString(udtf, "eval")}"))
+
+val inferredTypes = foundSignature
+  .map(TypeExtractor.getForClass(_))
+  .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
+
+for (i <- operandTypes.indices) {
+  if (i < inferredTypes.length - 1) {
+operandTypes(i) = inferredTypes(i)
+  } else if (null != inferredTypes.last.getComponentType) {
+// last argument is a collection, the array type
+operandTypes(i) = inferredTypes.last.getComponentType
+  } else {
+operandTypes(i) = inferredTypes.last
+  }
+}
+  }
 }
-// derives whether the 'input'th parameter of a method is optional.
-val optional: Predicate[Integer] = new Predicate[Integer]() {
-  def apply(input: Integer): Boolean = {
-functionImpl.getParameters.get(input).isOptional
+  }
+
+  private[flink] def createOperandTypeChecker(
--- End diff --

Yes, I agree with you. I tried to merge these code but failed for some test 
cases. I didn't figure it out. But I think we can do it in another issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4620: [FLINK-7558][table]Improve SQL ValidationException messag...

2017-08-30 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4620
  
Thanks for updating.
+1 to merge



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4620: [FLINK-7558][table]Improve SQL ValidationException...

2017-08-30 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4620#discussion_r136014510
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func0
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UserDefinedFunctionValidationTest extends TableTestBase {
--- End diff --

We can add the test to  `ScalarFunctionsValidationTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4620: [FLINK-7558][table]Improve SQL ValidationException...

2017-08-30 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4620#discussion_r136014662
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func0
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UserDefinedFunctionValidationTest extends TableTestBase {
+
+  @Test
+  def testAppendSinkOnUpdatingTable(): Unit = {
--- End diff --

Could you change the test method name ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4620: [FLINK-7558][table]Improve SQL ValidationException...

2017-08-30 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4620#discussion_r136014229
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -106,7 +106,9 @@ object ScalarSqlFunction {
 val operandTypeInfo = getOperandTypeInfo(callBinding)
 
 val foundSignature = getEvalMethodSignature(scalarFunction, 
operandTypeInfo)
-  .getOrElse(throw new ValidationException(s"Operand types of 
could not be inferred."))
+  .getOrElse(throw new ValidationException(
+s"[${scalarFunction.getClass}]'s Operand types of 
[${operandTypeInfo}] " +
--- End diff --

I think the message can be improve like this , I copied the code from 
`createOperandTypeChecker` in this class. 
```
throw new ValidationException(
  s"Given parameters of function '$name' do not match any 
signature. \n" +
s"Actual: ${signatureToString(operandTypeInfo)} \n" +
s"Expected: ${signaturesToString(scalarFunction, "eval")}")`
```
And I think we need to update AggregateSqlFunction as well if needed. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

2017-08-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4536#discussion_r135430307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -71,6 +69,9 @@ class FlinkTableFunctionImpl[T](
 
   override def getElementType(arguments: util.List[AnyRef]): Type = 
classOf[Array[Object]]
 
+  // we do never use the FunctionParameters, so return an empty list
+  override def getParameters: util.List[FunctionParameter] = 
Collections.emptyList()
--- End diff --

`ReflectiveFunctionBase#getParameters` is used to create 
`SqlOperandTypeChecker` in the previous implementation. But with the new 
implementation, we don't need the `ReflectiveFunctionBase#getParameters` to 
create `SqlOperandTypeChecker`. So I think we can just return an empty list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

2017-08-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4536#discussion_r135430161
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 class FlinkTableFunctionImpl[T](
 val typeInfo: TypeInformation[T],
 val fieldIndexes: Array[Int],
-val fieldNames: Array[String],
-val evalMethod: Method)
-  extends ReflectiveFunctionBase(evalMethod)
--- End diff --

In the first implementation of UDTF, we use `ReflectiveFunctionBase` to 
infer the operand types. But with the new approach in this PR, we customize a 
`SqlOperandTypeChecker` to check operands which means we don't need the 
`ReflectiveFunctionBase` anymore. If we extends `ReflectiveFunctionBase` we 
have to register every `eval` method as a UDTF. But with the new approach in 
this PR, we only need to register once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4585: [FLINK-7491] add MultiSetTypeInfo; add built-in Collect A...

2017-08-25 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4585
  
Hi @suez1224  thanks for the PR, I think we can use `Array` instead of 
`AbstractMultiSet`. `AbstractMultiSet` is too obscure for users. In that case, 
we do not need the MultiSetSerilizer and MultiSetTypeInfo,  also the following 
queries can use UDF on the field with array type as the `evel(...)` parameters. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...

2017-08-24 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4355
  
The new created method `open(ctx)` and `close()` of `GeneratedAggregations` 
 are not called by `AggregateAggFunction` which is used in window aggregate.  I 
suggest to call the `open(ctx)` method but pass a `RuntimeContext` which throw 
exceptions in every method to tell users `User Defined AggregateFunction is not 
supported to call open() and close() in window`. But this can be addressed in 
another issue. 

BTW, I think the `AggregateCodeGenerator#generateAggregations` is too long 
with 500+ LOC. I would like to refactor it if you have no objection @kaibozhou 
@fhueske .  I have created 
[FLINK-7509](https://issues.apache.org/jira/browse/FLINK-7509) .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

2017-08-21 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4536
  
@sunjincheng121  @fhueske  I would be great if you can have a look ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134158258
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

How about add an unimplemented `queryConfig` method to TableEnvironment to 
avoid this util ? 

```
def queryConfig: QueryConfig
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134158670
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -132,6 +132,44 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
+checkValidTableName(name)
+
+tableSink match {
+  case t @ (_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] |
+_: RetractStreamTableSink[_]) =>
+registerTableInternal(name, new TableSinkTable(t))
+  case _ =>
+throw new TableException("BatchTableSink can not be registered in 
StreamTableEnvironment")
--- End diff --

There maybe more kinds of TableSink in the future. So I would suggest to 
change this exception message to `Only AppendStreamTableSink, 
UpsertStreamTableSink and RetractStreamTableSink can be registered in 
StreamTableEnvironment`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134162144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

Is this exception used to reject a INSERT INTO sql ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134177144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryC

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134173548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableSink Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableSink: String): Unit = {
+insertInto(tableSink, 
QueryConfig.getQueryConfigFromTableEnv(this.tableEnv))
+  }
+
+  /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableSink Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @param conf The [[QueryConfig]] to use.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableSink: String, conf: QueryConfig): Unit = {
--- End diff --

change the parameter name `tableSink` to `tableName` ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134180276
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.types.Row
+
+class CsvSQLTableSink(
--- End diff --

The `CsvSQLTableSink` is never used, should we remove this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134161822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
--- End diff --

I think the deprecated `sql` method should delegate calls to `sqlQuery`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134173572
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -800,6 +794,66 @@ class Table(
   }
 
   /**
+* Writes the [[Table]] to a [[TableSink]] specified by given name. The 
tableSink name
+* represents a registered [[TableSink]] which defines an external 
storage location.
+*
+* A batch [[Table]] can only be written to a
+* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+* [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+* [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+* [[org.apache.flink.table.sinks.UpsertStreamTableSink]].*
+*
+* @param tableSink Name of the [[TableSink]] to which the [[Table]] is 
written.
+* @tparam T The data type that the [[TableSink]] expects.
+*/
+  def insertInto[T](tableSink: String): Unit = {
--- End diff --

change the parameter name `tableSink` to `tableName` ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134160516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
--- End diff --

Please add a `deprecated` comment to the javadoc to tell users which new 
api should be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134135181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
+}
+setters.mkString("\n")
+  } else {
+""
+  }
+}
+
+def genCleanUpDataView: String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val cleanUpDataViews = new StringBuilder
+for (i <- aggs.indices) yield {
+  val setters = for (spec <- accConfig.get.accSpecs(i)) yield {
+val dataViewTerm = s"acc${i}_${spec.field.getName}_dataview"
+val cleanUp =
+  s"""
+|$dataViewTerm.clear();
+  """.stripMargin
+cleanUpDataViews.append(cleanUp)
+  }
+}
+
+cleanUpDataViews.toString()
+  } else {
+""
+  }
+}
+
+def genInitialize: String = {
+
+j"""
+   |  public final void initialize(
--- End diff --

I would like to rename the method name to `open(ctx)`. So that we can use 
the `reusableOpenStatements` and `reuseOpenCode()` of `CodeGenerator` to 
generate the content of `open`.  Currently, the `genInitialize` is somewhat 
ambiguous to `reuseInitCode()`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/StateViewUtils.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.dataview
+
+import java.util
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.common.state._
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * State view utils to create [[StateListView]] or [[StateMapView]]..
+  */
+object StateViewUtils {
--- End diff --

We may not need this, as we can code generate the creation code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134134801
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +107,11 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Clean up for the accumulators.
+*/
+  def cleanUp()
--- End diff --

cleanup is also a word, so we do not need a upper case `U` here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134153349
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -80,7 +82,8 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: Option[DataViewConfig])
--- End diff --

It seems that `accConfig` is always `Some(x)`, do we need the `Option`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
+   |  $ctxTerm);
+   """.stripMargin
+  } else if (dataViewField.getType == classOf[ListView[_]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createListView($descFieldTerm,
--- End diff --

Same as above, we can code gen the creation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118741
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView<String, Integer> map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
--- End diff --

I'm not sure whether it is good to add `private[flink]`,  because it is 
`public` for Java users actually.

And please make them `@transient`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -105,6 +108,13 @@ class AggregationCodeGenerator(
   inFields => for (f <- inFields) yield javaClasses(f)
 }
 
+// define runtimeContext as member variable
+val ctxTerm = s"runtimeContext"
--- End diff --

I think we do not need to make the runtimeContext as a member variable. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134138469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

The `addReusableDataViewConfig` should be in `AggregateCodeGenerator`. And 
I would like to change this method to `addReusableDataView(spec: DataViewSpec): 
String`, the returned String is the dataview member variable term. And the 
dataview creation code can be added into `reusableDataViewStatements`. And the 
code of `reusableDataViewStatements` can be added to `initialize(ctx)` (or 
`open(ctx)` as I suggested) at last.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151559
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  /**
+* Copy from a list instance.
+*
+* @param t List instance.
+* @return A copy of this list instance
+*/
+  def copyFrom(t: util.List[T]): ListView[T] = {
--- End diff --

the `copyFrom` method can be accessed by users. I should avoid this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134139167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Do we need this ?  It only used to add `RuntimeContext` member area, but 
`RuntimeContext` is only used in `initialize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134152736
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
--- End diff --

Yes, I think we can use accumulator type info instead of field information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView<String, Integer> map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def remove(key: K): Unit = map.remove(key)
+
+  /**
+* Returns whether there exists the given mapping.

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137525
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

I think we do not need the `StateViewUtils` here, we can create a MapView 
using code gen directly, because we already have the RuntimeContext and 
StateDescriptor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
--- End diff --

make the list private, other wise Java users can access it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134119014
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
--- End diff --

I find that the dataview term is defined in many place, can we create a 
method to generate the term name? such as `createDataViewTerm(index: Int, 
fieldName: String)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

Please make the elementTypeInfo as `@transient`, and do we want the type 
info to 
 be accessed by users? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView<String, Integer> map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
--- End diff --

please make the `map` private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -108,30 +112,38 @@ object AggregateUtil {
   outputArity,
   needRetract = false,
   needMerge = false,
-  needReset = false
+  needReset = false,
+  accConfig = Some(DataViewConfig(accSpecs, isUseState))
 )
 
+val accConfig = accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
 if (isRowTimeType) {
   if (isRowsClause) {
 // ROWS unbounded over process function
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
   CRowTypeInfo(inputTypeInfo),
-  queryConfig)
+  queryConfig,
+  accConfig)
--- End diff --

+1 to do this. A great improvement! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

Add `private[flink]` to key and value type infos will make them public in 
Java. I'm not sure whether it is a good idea. I would like to not expose them 
to users (Java users), and the reflection only happens in compile which is fine 
I think. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-15 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4530
  
I'm fine with the changes. +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4544: [FLINK-7451] [table] Support non-ascii character l...

2017-08-15 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/4544

[FLINK-7451] [table] Support non-ascii character literals in Table API and 
SQL

## What is the purpose of the change

The non-ascii characters in string literals causes calcite planner to throw 
the following exception:

 ```
Failed to encode '%测试%' in character set 'ISO-8859-1'
```

## Brief change log

- add default charset for FlinkTypeFactory

## Verifying this change

This change added tests and can be verified as follows:

- Added expression test for non-ascii literals

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? na



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink non-ascii

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4544.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4544


commit 6013d2693a4eb3622dc1cbb7d27829ee5b566713
Author: Jark Wu <j...@apache.org>
Date:   2017-08-15T11:24:57Z

[FLINK-7451] [table] Support non-ascii character literals in Table API and 
SQL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4532: [FLINK-7337] [table] Refactor internal handling of time i...

2017-08-15 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4532
  
I'm +1 to merge this. 

I have create two followup issue, we can move the discussion under the JIRA:
(1) FLINK-7446
Support to define an existing field as the rowtime field for TableSource
(2) FLINK-7448
Keep the data type unchanged when register an existing field as rowtime


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-14 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r133104586
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -46,29 +47,29 @@ class StreamTableSourceScan(
 val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
 val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-val fieldCnt = fieldNames.length
+val fields = fieldNames.zip(fieldTypes)
 
-val rowtime = tableSource match {
+val withRowtime = tableSource match {
   case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
--- End diff --

Sure, I have logged it https://issues.apache.org/jira/browse/FLINK-7446


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4538: [FLINK-7441] [table] Double quote string literals is not ...

2017-08-14 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4538
  
Thanks for the reminding @zentol . I will pay attention to the commit 
message in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4538: [FLINK-7441] [table] Double quote string literals ...

2017-08-14 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/4538

[FLINK-7441] [table] Double quote string literals is not supported in Table 
API and SQL

## What is the purpose of the change

Code generation doesn't handle double quote string literals and some 
control characters which leads to compile error.

## Brief change log

- add string escape to `CodeGenerator#visitLiteral`

## Verifying this change

This change added tests and can be verified as follows:

- Added test `UserDefinedScalarFunctionTest#testDoubleQuoteParameters ` 
that validate double quotes can be handled correctly.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? na



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink double-quote

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4538


commit fdec04d4fe9257b952bf2a150ab1cae9481e5f32
Author: Jark Wu <j...@apache.org>
Date:   2017-08-14T08:44:16Z

[FLINK-7441] [table] Double quote string literals is not supported in Table 
API and SQL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

2017-08-14 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/4536

[FLINK-7439] [table] Support variable arguments for UDTF in SQL

## What is the purpose of the change

Support variable arguments for UDTF in SQL

## Brief change log

- `TableSqlFunction`'s operand type inference and checker
- modify `ScalarFunctionCallGen` and `TableFunctionCallGen` which handles 
var args incorrectly.

## Verifying this change

This change added tests and can be verified as follows:

- *Added `CorrelateTest.scala` for batch and stream queries to verify 
logical plan*

The integration test can be covered by existing `CorrelateITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? na



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink udtf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4536


commit b18ce206ad69e0abf72f5388538bb65e4911c65a
Author: Jark Wu <j...@apache.org>
Date:   2017-08-14T06:18:52Z

[FLINK-7439] [table] Support variable arguments for UDTF in SQL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132863952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
 ---
@@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 
 /**
-  * Function that materializes a time attribute to the metadata timestamp. 
After materialization
-  * the result can be used in regular arithmetical calculations.
+  * Function that materializes a processing time attribute.
+  * After materialization the result can be used in regular arithmetical 
calculations.
   */
-object TimeMaterializationSqlFunction
+object ProctimeSqlFunction
--- End diff --

Should we move this object to `org.apache.flink.table.functions.sql` 
package? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132840719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

It builds successfully when I remove the `toArray` in my local environment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132841043
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I'm fine with both current approach and use time indicator of left table as 
default. But I think no exception should be thrown when writing a Table to a 
TableSink. But currently, they share the same exception code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132834552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
+s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+  s"the table that should be converted to a DataStream.\n" +
+  s"Please select the rowtime field that should be used as 
event-time timestamp for the " +
+  s"DataStream by casting all other fields to TIMESTAMP.")
+} else if (rowtimeFields.size == 1) {
+  val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType
+  val convFieldTypes = origRowType.getFieldTypes.map { t =>
+if (FlinkTypeFactory.isRowtimeIndicatorType(t)) {
--- End diff --

.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132839934
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala
 ---
@@ -16,32 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime
+package org.apache.flink.table.runtime.conversion
 
 import java.lang.{Boolean => JBool}
 
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
-  * Convert [[CRow]] to a [[JTuple2]]
+  * Convert [[CRow]] to a [[JTuple2]].
   */
-class CRowInputJavaTupleOutputMapRunner(
+class CRowToJavaTupleMapRunner(
 name: String,
 code: String,
 @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
   extends RichMapFunction[CRow, Any]
   with ResultTypeQueryable[JTuple2[JBool, Any]]
   with Compiler[MapFunction[Row, Any]] {
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r132819009
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
 ---
@@ -19,7 +19,9 @@
 package org.apache.flink.table.runtime
 
 import java.lang.{Boolean => JBool}
+import java.sql.Timestamp
 
+import org.apache.calcite.runtime.SqlFunctions
--- End diff --

remove useless imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r132817997
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1247,27 +1258,32 @@ abstract class CodeGenerator(
 }
   }
 
-  private[flink] def generateRecordTimestamp(isEventTime: Boolean): 
GeneratedExpression = {
+  private[flink] def generateRowtimeAccess(): GeneratedExpression = {
 val resultTerm = newName("result")
-val resultTypeTerm = 
primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+val nullTerm = newName("isNull")
 
-val resultCode = if (isEventTime) {
+val accessCode =
   s"""
-|$resultTypeTerm $resultTerm;
-|if ($contextTerm.timestamp() == null) {
+|Long $resultTerm = $contextTerm.timestamp();
+|if ($resultTerm == null) {
 |  throw new RuntimeException("Rowtime timestamp is null. Please 
make sure that a proper " +
 |"TimestampAssigner is defined and the stream environment uses 
the EventTime time " +
 |"characteristic.");
 |}
-|else {
-|  $resultTerm = $contextTerm.timestamp();
-|}
-|""".stripMargin
-} else {
+|boolean $nullTerm = false;
+   """.stripMargin
+
+GeneratedExpression(resultTerm, nullTerm, accessCode, 
TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+  }
+
+  private[flink] def generateProctimeTimestamp(): GeneratedExpression = {
+val resultTerm = newName("result")
+val resultTypeTerm = 
primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+val resultCode =
   s"""
 |$resultTypeTerm $resultTerm = 
$contextTerm.timerService().currentProcessingTime();
--- End diff --

Why not hardcode the `$resultTypeTerm` as `long` ? The 
`currentProcessingTime()` always returns `long` primitive type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r132818559
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
@@ -46,29 +47,29 @@ class StreamTableSourceScan(
 val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
 val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-val fieldCnt = fieldNames.length
+val fields = fieldNames.zip(fieldTypes)
 
-val rowtime = tableSource match {
+val withRowtime = tableSource match {
   case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
--- End diff --

For the `DefinedRowtimeAttribute`, we hope the rowtime field can replace an 
existing field. Just like register a DataStream, the rowtime field can be 
appended but also can replace an existing field.

It is not related to this PR, but we can discuss it at here and to do it in 
a separate issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4488#discussion_r132819122
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class WrappingTimestampSetterProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
+  extends ProcessFunction[CRow, OUT] {
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+function match {
--- End diff --

We can use `FunctionUtils` instead of match case.  But I'm also fine with 
match case.

```scala
FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
FunctionUtils.openFunction(function, parameters)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4488: [FLINK-7337] [table] Refactor internal handling of time i...

2017-08-08 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4488
  
@twalthr  I'm working on other issues before Thursday, so I would like to 
have a look at it at Thursday (Beijing). But if you are hurry, I'm fine to 
merge this first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....

2017-08-04 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r131512787
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 ---
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
--- End diff --

Yes,  you are right. We should make sure it is a Calcite bug and will be 
fixed in the next release. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....

2017-08-03 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r131302891
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 ---
@@ -55,12 +55,13 @@ class JoinTest extends TableTestBase {
   unaryNode(
 "DataStreamCalc",
 streamTableNode(1),
-term("select", "a", "b", "proctime")
+term("select", "a", "b", "-(proctime, 360) AS -",
--- End diff --

I think it's easy to prevent the time indicator push-down in our side. 
Calcite 1.3 changed the `ProjectJoinTransposeRule.INSTANCE` to `new 
ProjectJoinTransposeRule( PushProjector.ExprCondition.TRUE, 
RelFactories.LOGICAL_BUILDER);` instead of `new 
ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, 
RelFactories.LOGICAL_BUILDER);`.  

In order to change the default behavior of the rule, the only thing we need 
to do is to create a new `ProjectJoinTransposeRule` with a custom 
`PushProjector.ExprCondition` which filters prevent time indicator rex nodes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128798162
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * ... ...
+  * // accumulator.get()
--- End diff --

Do you mean that can't use `accumulator.list.add(...)` here? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4379: [FLINK-7194] [table] Add methods for type hints to UDAGG ...

2017-07-21 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4379
  
Loos good to me. +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128499164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -535,4 +654,21 @@ object UserDefinedFunctionUtils {
   }
 }
   }
+
+  def getFieldTypeInfo(
+clazz: Class[_],
+obj: Object,
+fieldName: String): Option[TypeInformation[_]] = {
--- End diff --

What about to make this method more general?

```scala
def getFieldValue[T](
clazz: Class[_],
obj: Object,
fieldName: String): Any = {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128499278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -535,4 +654,21 @@ object UserDefinedFunctionUtils {
   }
 }
   }
+
+  def getFieldTypeInfo(
+clazz: Class[_],
+obj: Object,
+fieldName: String): Option[TypeInformation[_]] = {
+
+val field = TypeExtractor.getDeclaredField(clazz, fieldName)
+val typeInfo = if (field != null ) {
+  if (!field.isAccessible) {
+field.setAccessible(true)
+  }
+  field.get(obj).asInstanceOf[TypeInformation[_]]
+} else {
+  null
+}
+Some(typeInfo)
--- End diff --

Note that `Some(null)` is not `None`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128495510
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for {@link HeapListView}. The serializer relies on an 
element
--- End diff --

`{@link HeapListView}` -> `[[HeapListView]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128493758
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * Heap view factory to create [[HeapListView]] or [[HeapMapView]].
+  *
+  * @param accConfig Accumulator config.
+  */
+class HeapViewFactory(accConfig: Map[String, StateDescriptor[_, _]])
--- End diff --

Do not need the accConfig parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128495405
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for {@link HeapListView}. The serializer relies on an 
element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
--- End diff --

Is this a recursive? 
The implementation should be `obj != null && obj.getClass == getClass`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128495770
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for {@link HeapListView}. The serializer relies on an 
element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = obj != null && obj.getClass == 
getClass
--- End diff --

The implementation should be 
```
canEquals(this) && listSerializer.equals(obj.asInstanceOf[ListSerialize[_]])
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128496007
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for {@link HeapMapView}. The serializer relies on a key 
serializer and a value
--- End diff --

`{@link HeapMapView}` -> `[[HeapmapView]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128500452
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1399,14 +1463,28 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val dataViewTypes = getDataViewTypeInfoFromConstructor(agg)
+val (accType, specs) = extractDataViewTypeInfo(index, agg, 
accTypes(index), dataViewTypes,
--- End diff --

break the parameters into multi line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128492343
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction<Long, MyAccum> {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * ... ...
+  * // accumulator.get()
--- End diff --

What is this mean? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128493583
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/DataViewFactory.scala
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.api.dataview.{DataView, ListView, MapView}
+
+/**
+  * Factory to creaate [[ListView]] or [[MapView]].
+  *
+  * @param accConfig accumulator config
+  */
+abstract class DataViewFactory(accConfig: Map[String, StateDescriptor[_, 
_]])
--- End diff --

I think the `DataViewFactory` abstract class doesn't need to accept 
parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r128493316
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -357,6 +423,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig.isDefined && accConfig.get.isUseState) {
+  throw new CodeGenException("Not support merge when use state")
--- End diff --

Would be better to enrich this. "DataView doesn't support merge when the 
backend uses state."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4263: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL

2017-07-19 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4263
  
@rtudoran I have exactly the same idea with you. Calcite doesn't push 
partition fields to LogicalSort. Actually, LogicalSort doesn't have a partition 
field. So I make a custom rule to transform LogicalWindow into a LogicalRank (I 
customize it) when the query pattern matched. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4263: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL

2017-07-18 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4263
  
Hi @fhueske ,  I left this comment under the JIRA. To make sure you know 
our situation, I post it here:

>I think the most useful feature is PR3 "ORDER BY  OFFSET FETCH" 
(i.e. TopN). It is an important requirement. And there is something more 
complex such as TopN for each group (e.g. 
https://stackoverflow.com/questions/176964/select-top-10-records-for-each-category).
 We have an idea (prototype) to support it, and planning to make a whole design 
for the TopN (any-attribute), and hope that to meet soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4128: [FLINK-6893][table]Add BIN supported in SQL

2017-07-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4128#discussion_r127868400
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
 ---
@@ -33,6 +33,14 @@ object ScalarSqlFunctions {
 OperandTypes.NILADIC,
 SqlFunctionCategory.NUMERIC)
 
+  val BIN = new SqlFunction(
+"BIN",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.explicit(SqlTypeName.VARCHAR),
+null,
+OperandTypes.NUMERIC,
--- End diff --

I don't think BIN accepts all NUMERIC operands. I think it only accepts 
BIGINT, TINYINT, SMALLINT,INTEGER.  And not support other decimal numeric. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4128: [FLINK-6893][table]Add BIN supported in SQL

2017-07-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4128#discussion_r127866461
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -82,4 +82,24 @@ object ScalarFunctions {
 }
 sb.toString
   }
+
+  /**
+* Returns a string representation of the binary value of N, Returns 
NULL if N is NULL.
+*/
+  def bin(n: Long): String = {
+if (null == n) {
+  return null
+}
+val value = new Array[Byte](64)
+var num = n
+// Extract the bits of num into value[] from right to left
+var len: Int = 0
+do {
+  len += 1
+  value(value.length - len) = ('0' + (num & 1)).toByte
+  num >>>= 1
+} while (num != 0)
--- End diff --

Use `Long.toBinaryString(long)` to parse it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4128: [FLINK-6893][table]Add BIN supported in SQL

2017-07-17 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4128#discussion_r127868565
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -352,6 +352,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "Flinkxx")
   }
 
+  @Test
+  def testBin(): Unit = {
+testSqlApi("BIN(12)", "1100")
+testSqlApi("BIN(10)", "1010")
+testSqlApi("BIN(0)", "0")
+
testSqlApi("BIN(f32)","")
+  }
+
--- End diff --

Would be better to add validation tests to `ScalarFunctionsValidationTest` 
to check the expected exception for unsupported operand types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4171: [FLINK-6887] [table] Split up CodeGenerator into several ...

2017-07-17 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4171
  
@fhueske  thanks for reviewing. I had fixed the code style issue. 

I will merge this once the CI pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4263: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL

2017-07-17 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4263
  
@rtudoran  "SELECT x FROM stream ORDER BY *time FETCH 2" do not need 
retraction. Because it is order by ascending time. The query will only emit the 
first 2 rows and after that drop all rows. We need retraction if the query is 
order by time desc. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4324: [FLINK-6232] [table] Add processing time window inner joi...

2017-07-14 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4324
  
The code looks good to me. Good refactoring!

+1 to merge.

BTW, I have a question. This time-windowed join is different with 
`DataStream.join(...).window(...)`.  The `DataStream.join.window` is joining 
two streams on a same window (such as 1 hour tumbling window).  Do we have plan 
to support it? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4274: [FLINK-6975][table]Add CONCAT/CONCAT_WS supported in Tabl...

2017-07-14 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4274
  
+1 to merge. 

I will merge this in this weekend. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4274: [FLINK-6975][table]Add CONCAT/CONCAT_WS supported ...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4274#discussion_r127382075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 ---
@@ -277,3 +278,47 @@ case class Overlay(
   position.toRexNode)
   }
 }
+
+/**
+  * Returns the string that results from concatenating the arguments.
+  * Returns NULL if any argument is NULL.
+  */
+case class Concat(strings: Seq[Expression]) extends Expression with 
InputTypeSpec {
+
+  override private[flink] def children: Seq[Expression] = strings
+
+  override private[flink] def resultType: TypeInformation[_] = 
BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+children.map(_ => STRING_TYPE_INFO)
+
+  override def toString: String = s"concat($strings)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+relBuilder.call(ScalarSqlFunctions.CONCAT, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns the string that results from concatenating the arguments and 
separator.
+  * Returns NULL If the separator is NULL.
+  *
+  * Note: this user-defined function does not skip empty strings. However, 
it does skip any NULL
+  * values after the separator argument.
+  **/
+case class ConcatWs(separator: Expression, strings: Seq[Expression])
--- End diff --

What about make this signature to `args: Seq[Expression]`,  which combines 
`separator` and `strings` before construct `ConcatWs`. So that we do not need 
to change the FunctionCatalog. I think it's fine, because `ConcatWs` is not 
used by users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127192628
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

Thank you for the explanation, that makes sense to me.  But I find 
`DataStreamOverAggregate` and `DataStreamGroupWindowAggregate` use 
`DataStreamRetractionRules.isAccRetract`, is that a misusage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127173868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

The following SQL `select a, sum(b), a+1 from t1 group by a` will optimized 
into the following nodes:

```
DataStreamCalc (AccRetract,  producesUpdates=false)
DataStreamGroupAggregate (AccRetract, producesUpdates=true)
   DataStreamScan (Acc, producesUpdates=fale)
```
The DataStreamCalc is append only, but is in AccRetract mode which means 
the output contains retraction. 

I think we want to check whether the input contains retraction, right? 





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127165003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

We should use `DataStreamRetractionRules.isAccRetract(input)` to check 
whether the input will produces updates.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126683700
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+(1L, 1, "Hello"),
+(2L, 2, "Hello"),
+(3L, 3, "Hello"),
+(4L, 4, "Hello"),
+(5L, 5, "Hello"),
+(6L, 6, "Hello"),
+(7L, 7, "Hello World"),
+(8L, 8, "Hello World"),
+(20L, 20, "Hello World"))
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
--- End diff --

You can simply do `StreamITCase.clear` instead of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127142150
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](element1Type)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+ 

  1   2   3   4   5   6   7   >