[GitHub] flink pull request #6282: [FLINK-6847][FLINK-6813][Table Api & Sql] Timestam...

2018-07-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6282#discussion_r204230530
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -4490,6 +4490,16 @@ dateFormat(TIMESTAMP, STRING)
   
 
 
+
+  
+{% highlight text %}
--- End diff --

highlight text => highlight scala


---


[GitHub] flink pull request #6282: [FLINK-6847][FLINK-6813][Table Api & Sql] Timestam...

2018-07-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6282#discussion_r204230536
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -4490,6 +4490,16 @@ dateFormat(TIMESTAMP, STRING)
   
 
 
+
+  
+{% highlight text %}
+timestampDiff(unit, timestamp1, timestamp2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between 
timestamp1 and timestamp2. The unit for the interval is given by the unit 
argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02'.toDate, '2003-01-03'.toDate) leads to 1.
--- End diff --

timestamp1 and timestamp2 => datetime1 and datetime2


---


[GitHub] flink pull request #6282: [FLINK-6847][FLINK-6813][Table Api & Sql] Timestam...

2018-07-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6282#discussion_r204230539
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = 
timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, 
intervalUnit.endUnit, SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
+
+relBuilder.getRexBuilder.makeCast(intType, rexCall)
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", 
")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
--- End diff --

INT_TYPE_INFO => LONG_TYPE_INFO


---


[GitHub] flink pull request #6282: [FLINK-6847][FLINK-6813][Table Api & Sql] Timestam...

2018-07-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6282#discussion_r204230534
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -4490,6 +4490,16 @@ dateFormat(TIMESTAMP, STRING)
   
 
 
+
+  
+{% highlight text %}
+timestampDiff(unit, timestamp1, timestamp2)
--- End diff --

timestampDiff(unit, timestamp1, timestamp2) => 
timestampDiff(TimeIntervalUnit, datetime1, datetime2)
TIMESTAMP is different from DATETIME. The supported range of DATETIME is 
'1000-01-01 00:00:00' to '-12-31 23:59:59', while TIMESTAMP has a range of 
'1970-01-01 00:00:01' UTC to '2038-01-19 03:14:07' UTC


---


[GitHub] flink pull request #6282: [FLINK-6847][FLINK-6813][Table Api & Sql] Timestam...

2018-07-22 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6282#discussion_r204230537
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = 
timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, 
intervalUnit.endUnit, SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
--- End diff --

SqlTypeName.INTEGER => SqlTypeName.BIGINT


---


[GitHub] flink issue #6337: [FLINK-9853][Tabel API & SQL] add HEX support

2018-07-20 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6337
  
Thanks for your PR. +1 to merge.


---


[GitHub] flink issue #6367: [FLINK-9850] Add a string to the print method to identify...

2018-07-19 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6367
  
@yanghua Thanks for your update. +1 to merge 


---


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-19 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6335
  
@zhangminglei  It sounds great :-)


---


[GitHub] flink pull request #6367: [FLINK-9850] Add a string to the print method to i...

2018-07-18 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6367#discussion_r203588835
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -959,6 +959,29 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def printToErr() = stream.printToErr()
 
+  /**
+* Writes a DataStream to the standard output stream (stdout). For each
+* element of the DataStream the result of .toString is
--- End diff --

.toString => [[AnyRef.toString()]]


---


[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream

2018-07-18 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6335
  
Hi @zhangminglei ,
Good catch! Maybe the Reader may also need to be adapted, making `new 
InputStreamReader` to `Channels.newReader`. I find a benchmark about File 
InputStream and Reader 
[here](https://arnaudroger.github.io/blog/2017/03/20/faster-reader-inpustream-in-java.html).
 Hope it helps.


---


[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

2018-07-13 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6301
  
@jrthe42 Hi, thanks for your PR. 
From my side, I think use a connection pool to solve the connection problem 
is a better way. We don't need to keep the connections all the way. It wastes 
the connection resources if most threads have been idle for a long time. Also, 
the connection pool will not bring extra cost if threads are busy writing data 
into database, since the connections in the pool will be reused. 

I googled just now and find the `MiniConnectionPoolManager ` descriptions 
[here](http://www.source-code.biz/miniconnectionpoolmanager/).  Maybe we can 
use it. 

Best, Hequn



---


[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

2018-07-07 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6075
  
Hi @zhangminglei , The main reason to implement the `CheckpointedFuntion ` 
is you have kept a buffer in you Writer class. The buffer is a memory buffer 
which will suffer data loss during job failover.


---


[GitHub] flink issue #6266: [FLINK-9682] Add setDescription to execution environment ...

2018-07-07 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6266
  
Hi @yanghua , I think if the description is too long, we can simply cut off 
the description. It is safe and easy.


---


[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

2018-07-07 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6075
  
Hi @zhangminglei , thanks very much for your PR. 

As for dependencies, I think it is better to move ORC related classes into 
the module of `flink-orc`, so we don't need to add orc dependency in module of 
`flink-connector-filesystem`.

Moreover, the `OrcFileWriter` should implement interface 
`CheckpointedFunction` to prevent loss of data in `rowBatch` during job 
failover. During checkpointing, you can store the data in `rowBatch` into 
state, or call `flush()`. Similar logic can be found in `BucketingSink`.

Best, Hequn.


---


[GitHub] flink issue #6267: [FLINK-5750] Incorrect parse of brackets inside VALUES su...

2018-07-05 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6267
  
@AlexanderKoltsov Hi, thanks for looking into this problem. The PR looks 
good. I agree with @fhueske that `DataStreamUnion ` should be fixed in this PR. 
Furthermore, I find `DataSetMinus` and `DataSetIntersect` have the same 
problem. It would be great if you can open a new jira to track the problem. 


---


[GitHub] flink issue #6255: [FLINK-9681] [table] Make sure difference between minRete...

2018-07-05 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6255
  
@fhueske Comments have been addressed. Thanks for your review.


---


[GitHub] flink pull request #6255: [FLINK-9681] [table] Make sure difference between ...

2018-07-04 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/6255

[FLINK-9681] [table] Make sure difference between minRetentionTime and 
maxRetentionTime at least 5 minutes


## What is the purpose of the change

This PR aims to make sure difference between minRetentionTime and 
maxRetentionTime at least 5 minutes. Currently, for a group by(or other 
operators), if minRetentionTime equals to maxRetentionTime, the group by 
operator will register a timer for each record coming at different time which 
cause performance problem. The reasoning for having two parameters is that we 
can avoid to register many timers if we have more freedom when to discard 
state. As min equals to max cause performance problem it is better to make sure 
these two parameters are not same.


## Brief change log

  - Throw exception when difference between minRetentionTime and 
maxRetentionTime smaller than  5 minutes
  - Add a `QueryConfigTest` class extends from StreamQueryConfig. 
`QueryConfigTest` don't have the 5min limitation which makes test more 
convenient.
  - Adapt tests.


## Verifying this change

This change added tests and can be verified as follows:
  - Added test that validates that min and max retention time are set 
correctly.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink retentionTime

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6255.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6255


commit b41df60dde422bce9317479c8d77304d6ee4857b
Author: hequn8128 
Date:   2018-07-04T14:11:38Z

[FLINK-9681] [table] Make sure difference between minRetentionTime and 
maxRetentionTime at least 5 minutes




---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199537675
  
--- Diff: docs/dev/table/sql.md ---
@@ -1510,6 +1510,17 @@ ATAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+ATAN2(numeric)
+{% endhighlight %}
+  
+  
+Calculates the arc tangent of a given coordinates.
--- End diff --

I checked the [wiki](https://en.wikipedia.org/wiki/Atan2) about atan2, it 
said: The atan2 function calculates one unique arc tangent value from two 
variables y and x. So, would it be better `of two variables`?  atan2(y,x) can 
be the arc tangent of (x,y) or (nx, ny).


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199537034
  
--- Diff: docs/dev/table/sql.md ---
@@ -1510,6 +1510,17 @@ ATAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+ATAN2(numeric, numeric)
+{% endhighlight %}
+  
+  
+Calculates the arc tangent of a given coordinate.
--- End diff --

I checked the [wiki](https://en.wikipedia.org/wiki/Atan2) about atan2, it 
said: The atan2 function calculates one unique arc tangent value from two 
variables y and x. So, would it be better `of two variables`?  atan2(y,x) can 
be the arc tangent of (x,y) or (nx, ny).


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199537075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 ---
@@ -300,6 +300,24 @@ object FunctionGenerator {
 DOUBLE_TYPE_INFO,
 BuiltInMethods.ATAN_DEC)
 
+  addSqlFunctionMethod(
+ATAN2,
+Seq(DOUBLE_TYPE_INFO, DOUBLE_TYPE_INFO),
+DOUBLE_TYPE_INFO,
+BuiltInMethods.ATAN2)
+
+  addSqlFunctionMethod(
+ATAN2,
+Seq(DOUBLE_TYPE_INFO, BIG_DEC_TYPE_INFO),
+DOUBLE_TYPE_INFO,
+BuiltInMethods.ATAN2_DEC)
+
+  addSqlFunctionMethod(
+ATAN2,
+Seq(BIG_DEC_TYPE_INFO, BIG_DEC_TYPE_INFO),
+DOUBLE_TYPE_INFO,
+BuiltInMethods.ATAN2_DEC)
--- End diff --

BuiltInMethods.ATAN2_DEC_DEC 


---


[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-02 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199506183
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
+   assertEquals(Arrays.toString(CliUtils.rowToString(result)),
--- End diff --

The first element of `assertEquals` is expected and second is actual, 
correct the order of these two parameters. (Also for other tests)


---


[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-02 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199504811
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
+   assertEquals(Arrays.toString(CliUtils.rowToString(result)),
+   "[(NULL), String, c, false, 12345.67, 12345.67, 12345, 
2018-11-12, " +
+   "[1, 2], (1,123,null)]");
--- End diff --

I think it is better to keep one. How about change `NULL_COLUMN = "(NULL)"` 
to `NULL_COLUMN = "null"`.  @twalthr @snuyanzin 


---


[GitHub] flink pull request #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-02 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6233#discussion_r199504779
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliUtilsTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CliUtils}.
+ */
+public class CliUtilsTest {
+
+   @Test
+   public void testRowToString() throws IOException {
+   Row result = new Row(10);
+   result.setField(0, null);
+   result.setField(1, "String");
+   result.setField(2, 'c');
+   result.setField(3, false);
+   result.setField(4, 12345.67f);
+   result.setField(5, 12345.67d);
+   result.setField(6, 12345L);
+   result.setField(7, java.sql.Date.valueOf("2018-11-12"));
+   result.setField(8, new int[]{1, 2});
+   result.setField(9, new Tuple3<>(1, "123", null));
--- End diff --

I'm not quit sure whether output type can be tuple, but a row type is sure. 
For example, `result.setField(8, new Row(2));` , this will also output null.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-01 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199349619
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for better consistency. It is good to follow the Table-api style.


---


[GitHub] flink issue #6233: [FLINK-9696] Deep toString for array/map sql types

2018-07-01 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6233
  
Hi @snuyanzin , thanks for your PR. The code looks good and the 
`deepToString()` function returns result correctly. I could not spot any issues 
with the implementation. To make the PR better, I think we can add a test in 
`CliUtilsTest` to test the `rowToString` function, since code in the function 
also has been changed. BTW, the PR template can be done better. See for PR 
https://github.com/apache/flink/pull/5811 as an example.
Best, Hequn


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197662959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -349,6 +349,17 @@ case class TimestampAdd(
 if (!TypeCheckUtils.isString(unit.resultType)) {
   return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
 s"String Literal, but get ${unit.resultType}.")
+} else {
+  val unitStr = unit.toString()
+  if (!sqlTsiArray.contains(unitStr) &&
+!sqlTsiArray.map(item => item.split("_").last).contains(unitStr)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit 
to be one of (YEAR, " +
--- End diff --

Remove the "("  from "(YEAR", or add a ")" behind 


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197640442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
--- End diff --

1. Not only a string, but also should be a valid unit, i.e., must be YEAR 
MONTH...
2. Check count is a type of Integer or Long
3. Add ValidationTest, you can refer to the tests in 
`ScalarFunctionsValidationTest`


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-24 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197640444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,71 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", 
"SQL_TSI_QUARTER", "SQL_TSI_MONTH",
+"SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+
+  override private[flink] def children: Seq[Expression] = unit :: count :: 
timestamp :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isString(unit.resultType)) {
+  return ValidationFailure(s"TimestampAdd operator requires unit to be 
of type " +
+s"String Literal, but get ${unit.resultType}.")
+}
+if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) {
+  return ValidationFailure(s"TimestampAdd operator requires timestamp 
to be of type " +
+s"SqlTimeTypeInfo, but get ${timestamp.resultType}.")
+}
+ValidationSuccess
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val timeUnit = unit match {
+  case Literal(value: String, STRING_TYPE_INFO) =>
+if (sqlTsiArray.contains(value)) {
+  Some(TimeUnit.valueOf(value.split("_").last))
+} else {
+  Some(TimeUnit.valueOf(value))
+}
+  case _ => None
+}
+
+val interval = count match {
+  case Literal(value: Int, INT_TYPE_INFO) =>
+makeInterval(value.toLong, timeUnit)
+  case Literal(value: Long, LONG_TYPE_INFO) =>
+makeInterval(value, timeUnit)
+  case _ =>
+relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+  
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
+new SqlIntervalQualifier(timeUnit.get, null, 
SqlParserPos.ZERO)),
+  count.toRexNode)
+}
+
+relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, 
timestamp.toRexNode, interval)
+  }
+
+  override def toString: String = s"timestampAdd(${children.mkString(", 
")})"
+
+  override private[flink] def resultType: TypeInformation[_] = 
STRING_TYPE_INFO
+
+  private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit])
+(implicit relBuilder: RelBuilder) = {
+val countWithUnit = 
timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value))
+  relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit,
--- End diff --

indent with two spaces


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197614200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
--- End diff --

def children => def children: Seq[Expression]


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197614217
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
+var timeUnit : Option[TimeUnit] = None
+if (unit.isInstanceOf[Literal]) {
+var unitValue= unit.asInstanceOf[Literal].value.toString()
+val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", 
"SQL_TSI_MONTH", "SQL_TSI_WEEK",
+"SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+if (sqlTsiArray.contains(unitValue)) {
+  unitValue = unitValue.split("_").last
+}
+timeUnit = Some(TimeUnit.valueOf(unitValue))
+}
+
+relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode,
+relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+  
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
+  new 
SqlIntervalQualifier(timeUnit.get,null,SqlParserPos.ZERO)),
--- End diff --

(timeUnit.get,null,SqlParserPos.ZERO) => (timeUnit.get, null, 
SqlParserPos.ZERO)
add a space after the comma


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197614206
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
--- End diff --

toRexNode(implicit relBuilder: RelBuilder) => toRexNode(implicit 
relBuilder: RelBuilder): RexNode


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197614210
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
+var timeUnit : Option[TimeUnit] = None
+if (unit.isInstanceOf[Literal]) {
+var unitValue= unit.asInstanceOf[Literal].value.toString()
--- End diff --

1. "var unitValue="  => "var unitValue ="
2. indent with two spaces instead of four and same for other lines


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197614223
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
+var timeUnit : Option[TimeUnit] = None
+if (unit.isInstanceOf[Literal]) {
+var unitValue= unit.asInstanceOf[Literal].value.toString()
+val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", 
"SQL_TSI_MONTH", "SQL_TSI_WEEK",
+"SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+if (sqlTsiArray.contains(unitValue)) {
+  unitValue = unitValue.split("_").last
+}
+timeUnit = Some(TimeUnit.valueOf(unitValue))
+}
+
+relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode,
+relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+  
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
+  new 
SqlIntervalQualifier(timeUnit.get,null,SqlParserPos.ZERO)),
+  count.toRexNode))
+  }
+
+  override def toString: String = s"timestampAdd(${children.mkString(", 
")})"
+
+  override private[flink] def resultType = STRING_TYPE_INFO
--- End diff --

def resultType => def resultType: TypeInformation[_]


---


[GitHub] flink issue #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-22 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6188
  
@xueyumusic Great that you step out to contribute to flink, I will take a 
look this weekend :-)


---


[GitHub] flink issue #6079: [FLINK-8430] [table] Implement stream-stream non-window f...

2018-06-06 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6079
  
@twalthr Thanks a lot for the review and merging. I will take a look at the 
FLINK-9440 and see if we can do any improvement.


---


[GitHub] flink pull request #6079: [FLINK-8430] [table] Implement stream-stream non-w...

2018-05-25 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/6079

[FLINK-8430] [table] Implement stream-stream non-window full outer join


## What is the purpose of the change

Support stream-stream non-window full outer join.


## Brief change log

  - Add full join process function, including `NonWindowFullJoin` and 
`NonWindowFullJoinWithNonEquiPredicates`.
  - Add IT/UT/Harness tests for full outer join
  - Change document of stream-stream join.

## Verifying this change

This change added tests and can be verified as follows:

  - Added integration tests for full join with or without non-equal 
predicates.
  - Added HarnessTests full join with or without non-equal predicates.
  - Add tests for AccMode generate by full join.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8430

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6079.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6079


commit 8b9e8c09aae7d72fae3a6c6bed3327376029a5a2
Author: hequn8128 <chenghequn@...>
Date:   2018-05-26T02:29:51Z

[FLINK-8430] [table] Implement stream-stream non-window full outer join




---


[GitHub] flink pull request #6046: [FLINK-8429] Implement stream-stream non-window ri...

2018-05-19 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/6046

[FLINK-8429] Implement stream-stream non-window right outer join


## What is the purpose of the change

Implement stream-stream non-window right outer join. Most of the work has 
been done by [FLINK-8428](https://issues.apache.org/jira/browse/FLINK-8428), 
this pr mainly adds tests for right join.


## Brief change log

  - Add right join support during `translateToPlan`.
  - Add right join tests.
  - Update docs.


## Verifying this change

This change added tests and can be verified as follows:

  - Added IT/UT/harness tests for right join

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink rightjoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6046.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6046


commit 53306531fc148e4f38fdba62413053c74b12160a
Author: hequn8128 <chenghequn@...>
Date:   2018-05-19T04:05:44Z

[FLINK-8429] Implement stream-stream non-window right outer join




---


[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
@twalthr Hi, thanks for your review. I have updated the pr according to 
your suggestions. Changes mainly include:
- Remove changes about UpsertSink
- Refactor test case name and add more test to cover code path
- Add more method comments
- Add another base class `NonWindowOuterJoinWithNonEquiPredicates` and move 
corresponding variables and functions into it.
- Split `CRowWrappingMultiOutputCollector` into 
`CRowWrappingMultiOutputCollector` and `LazyOutputCollector`. 

Best, Hequn.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995939
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
 expected.add("D,R-8,null")
 StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+data1.+=((3, 8L, "Hi9"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+data2.+=((3, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.b > t2.b
+|""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi3",
+  "1,HiHi,Hi6",
+  "1,HiHi,Hi8",
+  "2,HeHe,Hi5",
+  "null,HeHe,Hi9")
+
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
+
+val ds

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

I planed to add right join in FLINK-8429. It's ok to add right join in this 
pr if you prefer.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995668
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
 ---
@@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase {
   )
 util.verifyTableTrait(resultTable, expected)
   }
-}
 
+  @Test
+  def testInnerJoinWithoutAgg(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, Int)]('bb, 'c)
+
+val resultTable = lTable
+  .join(rTable)
+  .where('b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, Acc"
+),
+"false, Acc"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, AccRetract"
+),
+"false, AccRetract"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testAggFollowedWithLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val countDistinct = new CountDistinct
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+  .groupBy('a)
+  .select('a, countDistinct('c))
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  binaryNode(
+"DataStreamJoin",
+"DataStreamScan(true, Acc)",
--- End diff --

`testJoin()` has covered this case.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -230,8 +230,12 @@ abstract class StreamTableEnvironment(
 tableKeys match {
   case Some(keys) => upsertSink.setKeyFields(keys)
   case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
-  case None if !isAppendOnlyTable => throw new TableException(
-"UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
+  case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() 
== null =>
--- End diff --

OK.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995438
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995228
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184994673
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184994503
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184994194
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
--- End diff --

OK. I create a base class for outer Join with non-equal 
predicates(`NonWindowOuterJoinWithNonEquiPredicates`).


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184993658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -60,6 +60,9 @@ class DataStreamJoin(
 
   override def needsUpdatesAsRetraction: Boolean = true
 
+  // outer join will generate retractions
+  override def producesRetractions: Boolean = joinType != JoinRelType.INNER
--- End diff --

Inner join doesn't produce retractions, left/right/full join does, for 
example, left join will retract the previous non-matched output when new 
matched row comes from the right side.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184993457
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode {
 */
   def consumesRetractions: Boolean = false
 
+  /**
+* Whether the [[DataStreamRel]] produces retraction messages.
+*/
+  def producesRetractions: Boolean = false
--- End diff --

A join generates retraction if it's type is left/right/full. It is 
different from agg which generates retractions if 
`sendsUpdatesAsRetraction(node) && node.producesUpdates` is true.


---


[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...

2018-04-20 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
@twalthr Hi, Great to see your review and valuable suggestions. I will 
update my pr late next week(maybe next weekend).  Thanks very much.


---


[GitHub] flink pull request #4674: [FLINK-7627] [table] SingleElementIterable should ...

2018-04-17 Thread hequn8128
Github user hequn8128 closed the pull request at:

https://github.com/apache/flink/pull/4674


---


[GitHub] flink issue #4674: [FLINK-7627] [table] SingleElementIterable should impleme...

2018-04-17 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4674
  
@fhueske Hi, I will close it. Thanks for the checking. 


---


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-03-14 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/
  
Hi, @walterddr 
I was thinking we added some `MapView` as new member variables of the 
generated class, so we don't need to extend the arity of `accumulatorState `.  
What do you think?


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-03-10 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r173644059
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, Integer]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, Integer]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, Integer]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
--- End diff --

Use mapView.get() directly and reuse the result. This can avoid to get 
state twice.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-03-10 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r173644062
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

LONG_TYPE_INFO is more safe? Int overflow will be easily reached given 1w 
records are processed per second. 


---


[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...

2018-03-08 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
Hi @twalthr @walterddr 
The latest update refactors interfaces and functions to make code more 
friendly to right/full joins. The code of right/full joins are also ready and 
can be reached from https://github.com/hequn8128/flink/tree/outerjoin 
(branch:outerjoin). 
@fhueske  It would be great if you can also take a look.
Thanks all. Hequn 


---


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-03-07 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/
  
@walterddr Cool, thanks for your update. I will take a look during this 
weekend or early next week :-)


---


[GitHub] flink issue #5613: [FLINK-8274] [table] Split generated methods for preventi...

2018-03-02 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5613
  
Hi @twalthr 
Current implementation will reach the limits if a field becomes too large. 
This can be reproduced by creating a filed produced by concat_ws(). see 
https://github.com/hequn8128/flink/commit/871fb2c7723c11dd0b75176a3be3be70e2740b2b
Best, Hequn



---


[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...

2018-02-28 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
Update pr according to @walterddr 's suggestions.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936623
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for 
stream-stream non-window Join.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JT

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936833
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+leftJoinCnt = getRuntimeContext.getMapState(leftJoinCntDescriptor)
+resultRow = new Row(resultType.getArity)
+
+LOG.debug("Instantiating NonWindowLeftJoinWithNonEquiPredicates.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpire

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936767
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
   

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936524
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
@@ -149,45 +148,39 @@ object UpdatingPlanChecker {
   }
 
 case j: DataStreamJoin =>
-  val joinType = j.getJoinType
-  joinType match {
-case JoinRelType.INNER =>
-  // get key(s) for inner join
-  val lInKeys = visit(j.getLeft)
-  val rInKeys = visit(j.getRight)
-  if (lInKeys.isEmpty || rInKeys.isEmpty) {
-None
-  } else {
-// Output of inner join must have keys if left and right 
both contain key(s).
-// Key groups from both side will be merged by join 
equi-predicates
-val lInNames: Seq[String] = 
j.getLeft.getRowType.getFieldNames
-val rInNames: Seq[String] = 
j.getRight.getRowType.getFieldNames
-val joinNames = j.getRowType.getFieldNames
-
-// if right field names equal to left field names, calcite 
will rename right
-// field names. For example, T1(pk, a) join T2(pk, b), 
calcite will rename T2(pk, b)
-// to T2(pk0, b).
-val rInNamesToJoinNamesMap = rInNames
-  .zip(joinNames.subList(lInNames.size, joinNames.length))
-  .toMap
+  // get key(s) for inner join
+  val lInKeys = visit(j.getLeft)
+  val rInKeys = visit(j.getRight)
+  if (lInKeys.isEmpty || rInKeys.isEmpty) {
+None
+  } else {
+// Output of inner join must have keys if left and right both 
contain key(s).
--- End diff --

Yes, thank you


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936813
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
--- End diff --

Long is more safe. I will change all count type to Long. What do you think? 


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
   

[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/
  
Cool! It can avoid the problems with nested map views and is more flexible.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170828036
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1393,6 +1393,21 @@ object AggregateUtil {
 throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
 }
   }
+
+  // create distinct accumulator delegate
+  if (aggregateCall.isDistinct) {
--- End diff --

Sql will be verified by calcite to exclude single DISTINCT during sql parse 
phase, so maybe we don't have to consider single distinct here.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170828027
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
+new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields)
--- End diff --

Make sense.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577231
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -105,11 +106,32 @@ class AggregationCodeGenerator(
 
 // get unique function name
 val funcName = newName(name)
+
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val distinctAggType = s"${classOf[DistinctAggDelegateFunction[_, 
_]].getName}"
+val isDistinctAcc = 
aggregates.map(_.isInstanceOf[DistinctAggDelegateFunction[_, _]])
+
 // register UDAGGs
 val aggs = aggregates.map(a => addReusableFunction(a, contextTerm))
 
+// register real aggregate functions without distinct delegate
+val realAggregates: Array[AggregateFunction[_ <: Any, _ <: Any]] = 
aggregates.map {
+  case distinctAggDelegate: DistinctAggDelegateFunction[_, _] =>
+distinctAggDelegate.realAgg
+  case agg: AggregateFunction[_, _] =>
+agg
+}
+
+val realAggTypes = aggregates.map {
--- End diff --

Can be replaced by `realAggregates.map(_.getClass.getName)`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577575
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
+new PojoTypeInfo[DistinctAccumulator[E, ACC]](clazz, pojoFields)
--- End diff --

Curre

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577434
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
--- End diff --

Add a corresponding retract function.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577325
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
--- End diff --

Add comments for this class and functions.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577531
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
+  def this() {
+this(null, null.asInstanceOf[ACC])
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+}
+
+class DistinctAggDelegateFunction[E, ACC](elementTypeInfo: 
TypeInformation[_],
+  var realAgg: 
AggregateFunction[_, ACC])
+  extends AggregateFunction[util.Map[E, Integer], DistinctAccumulator[E, 
ACC]] {
+
+  def getRealAgg: AggregateFunction[_, ACC] = realAgg
+
+  override def createAccumulator(): DistinctAccumulator[E, ACC] = {
+new DistinctAccumulator[E, ACC](
+  new MapView[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]],
+BasicTypeInfo.INT_TYPE_INFO),
+  realAgg.createAccumulator())
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + 1)
+false
+  } else {
+acc.mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def accumulate(acc: DistinctAccumulator[E, ACC], element: E, count: 
Int): Boolean = {
+if (element != null) {
+  if (acc.mapView.contains(element)) {
+acc.mapView.put(element, acc.mapView.get(element) + count)
+false
+  } else {
+acc.mapView.put(element, count)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def retract(acc: DistinctAccumulator[E, ACC], element: E): Boolean = {
+if (element != null) {
+  val count = acc.mapView.get(element)
+  if (count == 1) {
+acc.mapView.remove(element)
+true
+  } else {
+acc.mapView.put(element, count - 1)
+false
+  }
+} else {
+  false
+}
+  }
+
+  def resetAccumulator(acc: DistinctAccumulator[E, ACC]): Unit = {
+acc.mapView.clear()
+  }
+
+  override def getValue(acc: DistinctAccumulator[E, ACC]): util.Map[E, 
Integer] = {
+acc.mapView.map
+  }
+
+  override def getAccumulatorType: TypeInformation[DistinctAccumulator[E, 
ACC]] = {
+val clazz = classOf[DistinctAccumulator[E, ACC]]
+val pojoFields = new util.ArrayList[PojoField]
+pojoFields.add(new PojoField(clazz.getDeclaredField("mapView"),
+  new MapViewTypeInfo[E, Integer](
+elementTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO)))
+pojoFields.add(new PojoField(clazz.getDeclaredField("realAcc"),
+  realAgg.getAccumulatorType))
--- End diff --

`getAccumulatorType ` may return null if has not been overrided. I

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577418
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAggDelegateFunction.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+class DistinctAccumulator[E, ACC] (var mapView: MapView[E, Integer], var 
realAcc: ACC) {
--- End diff --

Remove blanks between `[E, ACC]` and `(var mapView`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577818
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,80 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testUnboundedDistinctGroupWindow(): Unit = {
--- End diff --

Remove this test case. The distinct in this case is solved by 
`AggregateExpandDistinctAggregatesRule`


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-02-26 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r170577731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1393,6 +1393,21 @@ object AggregateUtil {
 throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
 }
   }
+
+  // create distinct accumulator delegate
+  if (aggregateCall.isDistinct) {
--- End diff --

It is better to move this logic into the upper match. Distinct is also a 
kind of aggregate whose fields should not be empty. Also, we can reuse some 
variables. What do you think? 


---


[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...

2018-02-24 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
hi, @twalthr Look forward to your review, thanks  :-)


---


[GitHub] flink pull request #5404: [FLINK-8550][table] Iterate over entryset instead ...

2018-02-02 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5404

[FLINK-8550][table] Iterate over entryset instead of keys


## What is the purpose of the change

Iterate over entrysets instead of keys when we want to get both key and 
value. I went over the code in flink. Luckily, there are not many places we 
need to optimize.  


## Brief change log

  - Iterate over entrysets instead of keys when we want to get both key and 
value


## Verifying this change

This change is already covered by existing tests, such as 
`OverWindowHarnessTest` for changes in `ProcTimeBoundedRangeOver`
`OverWindowITCase` for changes in `RowTimeBoundedRangeOver`
`GroupWindowITCase` for changes in `JavaUserDefinedAggFunctions`
`CorrelateITCase` for changes in `UserDefinedTableFunctions`


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8550

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5404


commit 24a44c4c160e11f85d422b1acdfba20ea10a1a45
Author: hequn8128 <chenghequn@...>
Date:   2018-02-02T09:33:50Z

[FLINK-8550][table] Iterate over entryset instead of keys




---


[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...

2018-01-29 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
Hi, @twalthr  It would be great if you can take a look at the pr.  I'm 
looking forward to finish outer join   (left/right/full) before the end of 
March. Besides, there are a few PRs planed to optimize inner/outer joins. 
Thanks  :)


---


[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631861
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator<T1, T2, OUT>
+   extends AbstractUdfStreamOperator<OUT, JoinedProcessFunction<T1, T2, 
OUT>>
+   implements TwoInputStreamOperator<T1, T2, OUT> {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState<Long, List<Tuple3<T1, Long, Boolean>>> 
leftBuffer;
+   private transient MapState<Long, List<Tuple3<T2, Long, Boolean>>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631864
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator<T1, T2, OUT>
+   extends AbstractUdfStreamOperator<OUT, JoinedProcessFunction<T1, T2, 
OUT>>
+   implements TwoInputStreamOperator<T1, T2, OUT> {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState<Long, List<Tuple3<T1, Long, Boolean>>> 
leftBuffer;
+   private transient MapState<Long, List<Tuple3<T2, Long, Boolean>>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-29 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164631884
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator<T1, T2, OUT>
+   extends AbstractUdfStreamOperator<OUT, JoinedProcessFunction<T1, T2, 
OUT>>
+   implements TwoInputStreamOperator<T1, T2, OUT> {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState<Long, List<Tuple3<T1, Long, Boolean>>> 
leftBuffer;
+   private transient MapState<Long, List<Tuple3<T2, Long, Boolean>>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound

[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266892
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest  extends TableTestBase {
--- End diff --

Remove redundant blank between `CalcTest` and `extends`


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266896
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

Arguments of the method should be indented. 


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266907
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testArrayElementAtFromTableForTuple(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val data = List(
+  (1, Array((12, 45), (2, 5))),
--- End diff --

1. Add null tuple input test
2. Add nested tuple input test


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266897
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
--- End diff --

NPE will be thrown if ${record.resultTerm} is null 


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266903
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
+   |${fieldName.substring(1).toInt} - 1);
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[RowTypeInfo]) {
+  val fieldIndex = 
dot.operands.get(0).getType.asInstanceOf[CompositeRelDataType]
+.compositeType.getFieldIndex(fieldName)
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.getField(${fieldIndex});
--- End diff --

NPE will be thrown


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266900
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
+   |${fieldName.substring(1).toInt} - 1);
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
--- End diff --

NPE will be thrown 


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266898
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
+   |${fieldName.substring(1).toInt} - 1);
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
--- End diff --

NPE will be thrown 


---


[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug

2018-01-26 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5347
  
hi, @fhueske Thanks for your suggestions. The pr has been updated according 
to your comments.


---


[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug

2018-01-25 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5347
  
hi @fhueske @twalthr , the pr has been updated. It would be great if you 
can take a look at it.

Changes mainly include: 
1. Adapt estimateRowCount to be more accurate. The original implementation 
use a constant 0.75 to reduce the result which makes row count of merged calc 
bigger than the row count of un-merged calcs. Current implementation use a more 
accurate selectivity to reduce the result row count.
2. Merge calcs in convert rule of correrate. Double check to make sure 
unsupported exception won't be thrown


---


[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug

2018-01-24 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5347
  
Hi, @fhueske @twalthr ,

To solve the problem, we need to make the `estimateRowCount` in 
`CommonCalc` more accurate. I will update the pr tomorrow. Anyway, cost model 
can't solve the problem deterministically. The cost is just an estimate, so 
multi-cals will exist under some circumstances. 

As for correlate, to make sure unsupported exception won't be thrown, i 
will double check whether multi calcs are exist, and merge the calcs if need 
to. 

What do you think ? Thanks, Hequn.


---


[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug

2018-01-24 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5347
  
@twalthr thanks for your review. I will take a look and update the pr.


---


[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug

2018-01-23 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5347

[FLINK-8492][table] Fix calc cost bug


## What is the purpose of the change

Fix calc cost bug. Currently, unsupported exception will be thrown when 
multi calc existing between correlate and TableFunctionScan.


## Brief change log

  - Add a constant 1 to compCnt in `CommonCalc`


## Verifying this change

This change added tests and can be verified as follows:

  - Added integration tests for udtf with multi cals


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8492

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5347.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5347


commit bf1b4d4773c3690dfe73a0e71b22730d8b5eb99c
Author: hequn8128 <chenghequn@...>
Date:   2018-01-24T04:25:13Z

[FLINK-8492][table] Fix calc cost bug




---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-01-20 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5327

[FLINK-8428] [table] Implement stream-stream non-window left outer join


## What is the purpose of the change

Implement stream-stream non-window left outer join for sql/table-api. A 
simple design doc can be found 
[here](https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing)


## Brief change log

  - Add left join
- with non-equal predicates
- without non-equal predicates
  - Adapt retraction rules to left join. Outer join will generate 
retractions
  - Adapt `UpsertTableSink`. Table mode of dynamic table produced by left 
join is Update Mode, even if the table does not include a key definition
  - Add inner join test cases which consistent with test cases in batch.
  - Add left join test cases which consistent with test cases in batch.


## Verifying this change

This change added tests and can be verified as follows:

  - Added integration tests for left join with or without non-equal 
predicates.
  - Added HarnessTests left join with or without non-equal predicates.
  - Add tests for AccMode generate by left join.
  - Add tests for UpsertSink followed left join.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (already docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink leftjoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5327.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5327


commit 2766de56d47e1a82f6605eb1dd80d8ea5e697a29
Author: hequn8128 <chenghequn@...>
Date:   2018-01-21T04:54:08Z

[FLINK-8428] [table] Implement stream-stream non-window left outer join




---


[GitHub] flink issue #4471: [FLINK-6094] [table] Implement stream-stream proctime non...

2018-01-10 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
hi @twalthr  The pr has been submitted. 
https://issues.apache.org/jira/browse/FLINK-8400
Thanks, Hequn.


---


[GitHub] flink pull request #5273: [FLINK-8400] [table] Use the lexicographic smalles...

2018-01-10 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5273

[FLINK-8400] [table] Use the lexicographic smallest attribute as the common 
group id when extract unique key


## What is the purpose of the change

`UniqueKeyExtractor` will return a tuple, the first element is the name of 
a key field, the second is a group name that is shared by all equivalent key 
fields. The group names are used to identify same keys, for example: select('pk 
as pk1, 'pk as pk2), both pk1 and pk2 belong to the same group, i.e., pk1. Here 
we use the lexicographic smallest attribute as the common group id.

Currently, when extract unique keys from `DataStreamCalc`, the generated 
group id is not the lexicographic smallest attribute. This pr will fix this bug.


## Brief change log

  - Use lexicographic smallest attribute as group id when extract unique 
key for `DataStreamCalc`
  - Add test to check whether group id is the lexicographic smallest 
attribute.


## Verifying this change

This change added tests and can be verified as follows:

  - Add test to check whether group id is the lexicographic smallest 
attribute


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no )
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 6904_groupid

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5273.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5273


commit 53716b34ab7a9eab1362b07783a8a64fcc732757
Author: 军长 <hequn.chq@...>
Date:   2018-01-10T07:48:19Z

[FLINK-6094] [table] use the lexicographic smallest attribute as the common 
group id




---


[GitHub] flink issue #4471: [FLINK-6094] [table] Implement stream-stream proctime non...

2018-01-10 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/4471
  
hi @twalthr ~ Thanks for your review. I will give another pr to fix a small 
bug for FLINK-6094. 

PS: Outer joins are coming soon. Also, some PRs to optimize the 
stream-stream (inner/outer) join has been planed.  : )


---


[GitHub] flink pull request #5244: [FLINK-8366] [table] Use Row instead of String as ...

2018-01-05 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5244

[FLINK-8366] [table] Use Row instead of String as key when process upsert 
results


## What is the purpose of the change

This pr fix the bug in `TableSinkITCase.upsertResults()`. In 
`upsertResults()` function, we use String as key to upsert results. This will 
make (1,11) and (11,1) have the same key (i.e., 111).


## Brief change log

  - Use Row instead of String to avoid the String problem.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8366

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5244


commit 259751257a5b4154d4b16b46c7fa38e25638a5ef
Author: 军长 <hequn.chq@...>
Date:   2018-01-05T08:53:31Z

[FLINK-8366] [table] Use Row instead of String as key when process upsert 
results




---


  1   2   >