[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200523824
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+
+class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: 
Option[TableSourceTable[T1]],
--- End diff --

Huge +1. My understanding is this will be the overall class to hold a table 
source, sink or both. `TableSourceSinkTable` seems redundant. 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200522491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -160,10 +173,34 @@ abstract class BatchTableEnvironment(
   throw new TableException("Same number of field names and types 
required.")
 }
 
-tableSink match {
+val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+registerTableSinkInternal(name, configuredSink)
+  }
+
+  def registerTableSink(name: String, configuredSink: TableSink[_]): Unit 
= {
--- End diff --

could probably move this to based class `TableEnvironment` ?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524149
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

also just `TableFactoryService` ?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200521017
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
--- End diff --

missing `both` ?


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Common class for all descriptors describing a table sink.
+  */
+abstract class TableSinkDescriptor extends TableDescriptor {
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
--- End diff --

+1 Should be able to unify


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200524110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactoryService.scala
 ---
@@ -16,57 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util.{ServiceConfigurationError, ServiceLoader}
 
-import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION
-import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION
-import org.apache.flink.table.descriptors._
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, 
TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.Logging
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
-  * Service provider interface for finding suitable table source factories 
for the given properties.
+  * Unified interface to create TableConnectors, e.g. 
[[org.apache.flink.table.sources.TableSource]]
+  * and [[org.apache.flink.table.sinks.TableSink]].
   */
-object TableSourceFactoryService extends Logging {
+class TableConnectorFactoryService[T] extends Logging {
--- End diff --

+1 


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200523261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connector/TableConnectorFactory.scala
 ---
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sources
+package org.apache.flink.table.connector
 
 import java.util
 
-/**
-  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
-  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
-  * describe the desired table source. The factory allows for matching to 
the given set of
-  * properties and creating a configured [[TableSource]] accordingly.
-  *
-  * Classes that implement this interface need to be added to the
-  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
-  * the current classpath to be found.
-  */
-trait TableSourceFactory[T] {
+trait TableConnectorFactory[T] {
--- End diff --

+1 I think the most baffling point I have read until this point was the 
`Table*Connector*Factory` part :-)


---


[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...

2018-07-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6201#discussion_r200521222
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
 
+   private static TableDescriptor create(String name, Map 
config) {
+   if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) 
{
+   throw new SqlClientException("The 'type' attribute of a 
table is missing.");
+   }
+   final String tableType = (String) 
config.get(TableDescriptorValidator.TABLE_TYPE());
+   if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+   return new Source(name, 
ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+   return new Sink(name, ConfigUtil.normalizeYaml(config));
+   } else if 
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+   return new SourceSink(name, 
ConfigUtil.normalizeYaml(config));
+   }
+   return null;
+   }
+
public void setTables(List> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
-   if 
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
+   if (!config.containsKey(NAME)) {
+   throw new SqlClientException("The 'name' 
attribute of a table is missing.");
}
-   if 
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
 {
-   
config.remove(TableDescriptorValidator.TABLE_TYPE());
-   final Source s = Source.create(config);
-   if (this.tables.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate 
source name '" + s + "'.");
-   }
-   this.tables.put(s.getName(), s);
-   } else {
+   final Object name = config.get(NAME);
+   if (name == null || !(name instanceof String) || 
((String) name).length() <= 0) {
+   throw new SqlClientException("Invalid table 
name '" + name + "'.");
+   }
+   final String tableName = (String) name;
+   final Map properties = new 
HashMap<>(config);
+   properties.remove(NAME);
+
+   TableDescriptor tableDescriptor = create(tableName, 
properties);
+   if (null == tableDescriptor) {
throw new SqlClientException(
-   "Invalid table 'type' attribute 
value, only 'source' is supported");
+   "Invalid table 'type' attribute 
value, only 'source' or 'sink' is supported");
+   }
+   if (this.tables.containsKey(tableName)) {
+   throw new SqlClientException("Duplicate table 
name '" + tableName + "'.");
--- End diff --

if only `"source"` and `"sink"` is allowed, should we allow the same name 
but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1", 
"type": "sink"}` co-exist? this is actually following up with the previous 
comment. I think we just need one, either should work.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-01 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199352237
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

+1 for this approach that directly specifies the interval literals. 

Regarding Quarter. It seems like a very old implementation and we should 
probably use `"1970-01-01".toDate.extract(TimeIntervalUnit.QUARTER)` to make it 
consistent with all other time unit extractions. What do you guys think?

I just tried it out by modifying the `Extract` method and it seems working 
perfectly.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197622239
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
+var timeUnit : Option[TimeUnit] = None
+if (unit.isInstanceOf[Literal]) {
+var unitValue= unit.asInstanceOf[Literal].value.toString()
+val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", 
"SQL_TSI_MONTH", "SQL_TSI_WEEK",
+"SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+if (sqlTsiArray.contains(unitValue)) {
+  unitValue = unitValue.split("_").last
+}
+timeUnit = Some(TimeUnit.valueOf(unitValue))
+}
+
+relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode,
+relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+  
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
--- End diff --

no. what I mean is something like:
```
count match {
  case literal: Literal => //...generate a new IntervalSqlType based on 
unit and count
  case other: _ => //... Use your current approach using SQL 
SqlStdOperatorTable.MULTIPLY
   }
```
You can move the logic to validate `unit` as a `Literal` to `validateInput` 
function like @hequn8128 mentioned.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197613995
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
+var timeUnit : Option[TimeUnit] = None
+if (unit.isInstanceOf[Literal]) {
+var unitValue= unit.asInstanceOf[Literal].value.toString()
+val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", 
"SQL_TSI_MONTH", "SQL_TSI_WEEK",
--- End diff --

I think defining these in a static private field would be more appropriate. 


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197613915
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
+var timeUnit : Option[TimeUnit] = None
+if (unit.isInstanceOf[Literal]) {
--- End diff --

I think if we use pattern matching here it will be much cleaner, something 
like.
```
unit match {
  case literal: Literal => //...
  case _ => //... 
   }
```
That also remind me we should probably override `validateInput` function as 
well.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197613157
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -2804,6 +2804,17 @@ dateFormat(TIMESTAMP, STRING)
   
 
 
+
+  
+{% highlight java %}
+timestampAdd(unit, interval, timestamp)
+{% endhighlight %}
+  
+  
+Adds a (signed) integer interval to a timestamp. The unit for 
the interval is given by the unit argument, which should be one of the 
following values: "SECOND", "MINUTE", 
"HOUR", "DAY", "WEEK", 
"MONTH", "QUARTER", or "YEAR". E.g. 
timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
2003-01-09.
--- End diff --

Let's make it consistent with `sql.md`, I believe there's no `"` for the 
agument values


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r197614566
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ---
@@ -328,6 +330,42 @@ case class TemporalOverlaps(
   }
 }
 
+  /**
+* Standard conversion of the TIMESTAMPADD operator.
+* Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]]
+*/
+case class TimestampAdd(
+unit: Expression,
+count: Expression,
+timestamp: Expression)
+  extends Expression {
+
+  override private[flink] def children = unit :: count :: timestamp :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = 
{
+var timeUnit : Option[TimeUnit] = None
+if (unit.isInstanceOf[Literal]) {
+var unitValue= unit.asInstanceOf[Literal].value.toString()
+val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", 
"SQL_TSI_MONTH", "SQL_TSI_WEEK",
+"SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", 
"SQL_TSI_SECOND")
+if (sqlTsiArray.contains(unitValue)) {
+  unitValue = unitValue.split("_").last
+}
+timeUnit = Some(TimeUnit.valueOf(unitValue))
+}
+
+relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode,
+relBuilder.call(SqlStdOperatorTable.MULTIPLY,
+  
relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier,
--- End diff --

I think this part can probably be further optimize if `count` is also a 
Literal, this way the MULTIPLY operator is not necessary and it can directly 
convert to `IntervalSqlType`. what do you think?


---


[GitHub] flink issue #6049: [FLINK-9398][Client] Fix CLI list running job returns all...

2018-06-05 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/6049
  
Thanks for the comment @zentol . I actually thought about it and think we 
should group same jobs status listings together, there are also some other 
non-terminating states as well. Regarding the user-facing change to whether use 
`-t` (terminated) or `-a` (all). I lean towards using `-a`, but internally we 
named them terminated. Later on we can easily add supports like `finished` 
`cancelled` etc if necessary. What do you think?


---


[GitHub] flink pull request #6049: [FLINK-9398][Client] Fix CLI list running job retu...

2018-06-04 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6049#discussion_r192778560
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -474,6 +482,19 @@ protected void list(String[] args) throws Exception {

System.out.println("--");
}
}
+   if (showRemaining) {
+   if (remainingJobs.size() != 0) {
+   remainingJobs.sort(startTimeComparator);
--- End diff --

Thanks for the reply @zentol , I wasn't sure I fully understand this 
comment. Are you suggesting that we should always group jobs with same status 
together during print for all 3 cases?


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-24 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r190788353
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,196 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
+JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
+JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function extract_valid_pact_from_job_info_return() {
+PACT_MATCH=0
+if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
+then
+PACT_MATCH=$PACT_MATCH
+else
+PACT_MATCH=-1
+fi
+if [[ $1 =~ $JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR ]];
+then
+PACT_MATCH=$PACT_MATCH
+else
+PACT_MATCH=-1
+fi
+echo ${PACT_MATCH}
+}
+
+function extract_valid_job_list_by_type_from_job_list_return() {
+JOB_LIST_MATCH=0
+JOB_LIST_REGEX_EXTRACTOR="$JOB_LIST_REGEX_EXTRACTOR_BY_STATUS $2 $3"
+if [[ $1 =~ $JOB_LIST_REGEX_EXTRACTOR ]];
+then
+JOB_LIST_MATCH=$JOB_LIST_MATCH
+else
+JOB_LIST_MATCH=-1
+fi
+echo ${JOB_LIST_MATCH}
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
--- End diff --

checking the clean up code seems like it doesn't explicitly call stopping 
all tm (similar to `start-cluster.sh`, I need to explicitly call `taskmanager 
start`). I should remove only the `stop-cluster` actually.


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r190433925
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

good point. I will add the check 👍 


---


[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-22 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190124815
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
+  resultType: TypeInformation[Row],
+  queryConfig: StreamQueryConfig,
+  all: Boolean)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Logging {
+
+  validateEqualsHashCode("intersect", resultType)
+
+  // state to hold left stream element
+  private var leftState: ValueState[JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  private var rightState: ValueState[JTuple2[Int, Long]] = _
+
+  private val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  private var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  private var rightTimer: ValueState[Long] = _
+
+  private var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug("Instantiating StreamIntersectCoProcessFunction.")
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]](
+  "left", tupleTypeInfo)
+val rightStateDescriptor = new ValueStateDescriptor[JTuple2[Int, 
Long]](
+  "right", tupleTypeInfo)
+leftState = getRuntimeContext.getState(leftStateDescriptor)
+rightState = getRuntimeContext.getState(rightStateDescriptor)
+
+// initialize timer state
+val valueStateDescriptor1 = new 
ValueStateDescriptor[Long]("leftTimer", classOf[Long])
+leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
+val valueStateDescriptor2 = new 
ValueStateDescriptor[Long]("rightTimer", classOf[Long])
+rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
+
+cRowWrapper = new CRowWrappingMultiOutputCollector()
+//we emit one record per process at most
+cRowWrapper.setTimes(1)
+  }
+
+  override def processElement1(
+value: CRow,
+ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+out: Collector[CRow]): Unit = {
+
+processElement(value, ctx, out, leftState, leftTimer, rightState)
+  }
+
+  override def processElement2(
+value: CRow,
+ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+out: Collector[CRow]): Uni

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-22 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190124755
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
+  resultType: TypeInformation[Row],
+  queryConfig: StreamQueryConfig,
+  all: Boolean)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Logging {
+
+  validateEqualsHashCode("intersect", resultType)
+
+  // state to hold left stream element
--- End diff --

The description is misleading, you are not actually holding the "row" of 
stream element if I understand correctly. 


---


[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-22 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190120842
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
--- End diff --

Missing JavaDoc


---


[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-22 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190119234
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamIntersect.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment}
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
+import 
org.apache.flink.table.runtime.setop.StreamIntersectCoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConverters._
+
+class DataStreamIntersect(
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  rowRelDataType: RelDataType,
+  all: Boolean)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with DataStreamRel {
+
+  private lazy val intersectType = if (all) {
+"All"
+  } else {
+""
+  }
+
+  override def needsUpdatesAsRetraction: Boolean = true
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamIntersect(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  getRowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"Intersect$intersectType(intersect$intersectType: 
($intersectSelectionToString))"
--- End diff --

`s"Intersect$intersectType(intersect: ($intersectSelectionToString))"`
I dont think you need to duplicate the type twice


---


[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-22 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190123516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
+  resultType: TypeInformation[Row],
+  queryConfig: StreamQueryConfig,
+  all: Boolean)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Logging {
+
+  validateEqualsHashCode("intersect", resultType)
+
+  // state to hold left stream element
+  private var leftState: ValueState[JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  private var rightState: ValueState[JTuple2[Int, Long]] = _
+
+  private val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  private var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  private var rightTimer: ValueState[Long] = _
+
+  private var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug("Instantiating StreamIntersectCoProcessFunction.")
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new ValueStateDescriptor[JTuple2[Int, Long]](
+  "left", tupleTypeInfo)
+val rightStateDescriptor = new ValueStateDescriptor[JTuple2[Int, 
Long]](
+  "right", tupleTypeInfo)
+leftState = getRuntimeContext.getState(leftStateDescriptor)
+rightState = getRuntimeContext.getState(rightStateDescriptor)
+
+// initialize timer state
+val valueStateDescriptor1 = new 
ValueStateDescriptor[Long]("leftTimer", classOf[Long])
+leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
+val valueStateDescriptor2 = new 
ValueStateDescriptor[Long]("rightTimer", classOf[Long])
+rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
+
+cRowWrapper = new CRowWrappingMultiOutputCollector()
+//we emit one record per process at most
+cRowWrapper.setTimes(1)
+  }
+
+  override def processElement1(
+value: CRow,
+ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+out: Collector[CRow]): Unit = {
+
+processElement(value, ctx, out, leftState, leftTimer, rightState)
+  }
+
+  override def processElement2(
+value: CRow,
+ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+out: Collector[CRow]): Uni

[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-22 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190122671
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
+  resultType: TypeInformation[Row],
+  queryConfig: StreamQueryConfig,
+  all: Boolean)
+  extends CoProcessFunction[CRow, CRow, CRow]
--- End diff --

I guess I am confused here:

There's `CoGroupedStream` with customized `CoGroupFunction` which is 
already supported in DataStream API. seems like if we operate on a windowed 
stream, we can apply the `intersect` as a `CoGroupFunction`. Is this function 
solely targeting the non-windowed intersect case. If so, can we rename the 
function (also adds to my point: please add Java Doc).


---


[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-22 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190119548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamIntersect.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment}
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
+import 
org.apache.flink.table.runtime.setop.StreamIntersectCoProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConverters._
+
+class DataStreamIntersect(
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  rowRelDataType: RelDataType,
+  all: Boolean)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with DataStreamRel {
+
+  private lazy val intersectType = if (all) {
+"All"
--- End diff --

`" All"` might be better formatting since you only attached this to the 
`explainTerm` and `toString` method


---


[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

2018-05-22 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5849
  
Thanks @tzulitai for the suggestion. I will close this and continue with 
the new PR in #6054 .


---


[GitHub] flink pull request #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test ...

2018-05-22 Thread walterddr
Github user walterddr closed the pull request at:

https://github.com/apache/flink/pull/5849


---


[GitHub] flink pull request #6054: [FLINK-8986][e2e-test] Flink end to end test REST ...

2018-05-22 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/6054

[FLINK-8986][e2e-test] Flink end to end test REST API 

## What is the purpose of the change

Adding end to end test for REST APIs for FLIP-6 generated endpoints.

*This is a follow up PR based on #5849 and the changes & improvements are 
based on @zentol 's comment and suggestions.*
*This PR depends on one commit in #5863 (ea12737) *

## Brief change log

  - Adding in `flink-rest-api-test` module and `test_rest_api.sh` for 
end-to-end testing
  - Adding in payload YAML file for endpoints used by REST APIs if any 
POST/PATCH endpoint requires a payload.
  - Adding a validation sequence before testing REST API modules in order 
to extract all path / query parameters for REST API endpoints.

## Verifying this change

This test is run successfully against all REST API in 
`DispatcherRestEndpoint`.

This test will automatically pickup new endpoints for testing as long as 
support is added to `DispatcherRestEndpoint`. 

If new endpoint requires additional path / payload / query parameter, test 
will be skipped instead of failing for the new endpoints.

## Does this pull request potentially affect one of the following parts:

No

## Documentation

No documentations for this test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8986-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6054.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6054


commit ea127374af896d9dc82c1a9911f65b2360705397
Author: Rong Rong <rongr@...>
Date:   2018-04-17T20:45:23Z

initial commit to support CLI test, excluding YARN test

commit 119895ffdfb783985892fafdde5fc4e0646a260a
Author: Rong Rong <walter_ddr@...>
Date:   2018-04-25T18:51:12Z

adding in rest api tests, it went through 34 and skipped 6 which requires 
more data. Following completion of this test should add payload to 
requestBodies.yaml for testing

commit 7dc064a23854afe95737afb8602bc25a2f3aca3d
Author: Rong Rong <rongr@...>
Date:   2018-04-27T00:27:50Z

adding in post test with payload

commit 977abed8b63ab91e7b8a51df62df28d1ea0ec284
Author: Rong Rong <walter_ddr@...>
Date:   2018-04-27T05:05:11Z

adding in all testing request payloads and all 43 tests are passing




---


[GitHub] flink issue #6049: [FLINK-9398][Client] Fix CLI list running job returns all...

2018-05-22 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/6049
  
@zentol yeah, I totally agree with the naming issue and just updated with a 
more clear `ListOptions` definitions. Please take another look. 

Thanks


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-21 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189673647
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

Changed to use `-p 1`. I think the part that "parallelism should be taken 
by the CLI command" is verified by the exit code from the `run` execution. is 
that correct?


---


[GitHub] flink pull request #6049: [FLINK-9398][Client] Fix CLI list running job retu...

2018-05-20 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/6049

[FLINK-9398][Client] Fix CLI list running job returns all except scheduled 
jobs



## What is the purpose of the change

This PR fixes CLI command `bin/flink list -r` returning all except 
scheduled jobs.


## Brief change log

Change the behavior of `bin/flink list` to
  - Adding in a `-a` option to list all jobs. Including `CREATED`, 
`RUNNING` & `RESTARTING`
  - Fixing `-r` option to list only `RUNNING` and `RESTARTING`.
  - Internally map `-a` with `other` job type so it won't affect the 
behavior of explicit listing options when adding other options to `list` 
command.


## Verifying this change

This change added tests and can be verified as follows:

  - Added unit-test to verify `-a` options.
  - Added `ListOptions` unit-test suite which never existed before.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? documented in `cli.md`


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-9398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6049.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6049


commit d742f3ec5ffd4b411162db234e108a496a4f5aaa
Author: Rong Rong <walter_ddr@...>
Date:   2018-05-20T14:29:56Z

fix CLI list options showing all jobs instead of just running / restarting 
jobs

commit e6d827700427a3f31a699ff02cb5b9906715c629
Author: Rong Rong <walter_ddr@...>
Date:   2018-05-20T23:45:18Z

adding in documentation for list all jobs option




---


[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

2018-05-18 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5849
  
Hi @tzulitai . I've actually created a new version of the test based on 
@zentol 's comment on this PR: 
https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test

But it actually depends on https://github.com/apache/flink/pull/5863 as I 
reused the periodic stream job for testing. Is it possible to create a PR on 
top of another currently pending PR?


---


[GitHub] flink issue #5863: [FLINK-8985][e2etest] initial support for End-to-end CLI ...

2018-05-18 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5863
  
Thanks @tzulitai for the review. I will update asap. 
I am not 100% sure whether I should verify the CLI return but I would 
definitely add them.


---


[GitHub] flink issue #6007: [FLINK-8518][Table API & SQL] Support DOW, EPOCH, DECADE ...

2018-05-14 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/6007
  
Wasn't sure this is the full PR or just partial implementation, but I don't 
see `EPOCH` or `DECADE` in main or test though. 


---


[GitHub] flink pull request #5985: [FLINK-8135][docs] Add description to MessageParam...

2018-05-10 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5985#discussion_r187459062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
 ---
@@ -38,4 +38,10 @@ public String convertValueToString(Boolean value) {
public Boolean convertStringToValue(String value) {
return Boolean.valueOf(value);
}
+
+   @Override
+   public String getDescription() {
+   return "Boolean value. If true then serialized user task 
accumulators will be included into a response. " +
+   "False by default.";
--- End diff --

"False by default." is confusing to me, is "false": the default value of 
the parameter is `false`, or "false": whether the task accumulator will be 
included or not depends on a "default" configuration from the task accumulator?


---


[GitHub] flink pull request #5985: [FLINK-8135][docs] Add description to MessageParam...

2018-05-10 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5985#discussion_r187460147
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
 ---
@@ -44,4 +44,9 @@ protected String convertToString(final Integer value) {
return value.toString();
}
 
+   @Override
+   public String getDescription() {
+   return "The index of a subtask. Integer value starts from 0, 
should be positive.";
--- End diff --

`must be non-negative`, if "0" is valid


---


[GitHub] flink pull request #5985: [FLINK-8135][docs] Add description to MessageParam...

2018-05-10 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5985#discussion_r187458764
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SavepointPathQueryParameter.java
 ---
@@ -28,4 +28,9 @@
public SavepointPathQueryParameter() {
super(KEY, MessageParameterRequisiteness.OPTIONAL);
}
+
+   @Override
+   public String getDescription() {
+   return "Defines a path to savepoint to restore from.";
--- End diff --

" path to savepoint" -> "path of the savepoint" ?


---


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-04 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5940
  
LOL. I think I found a way: 
1. Rebase #3764 over to current master;
2. Rebase this branch to the rebased #3764;
3. Make changes on top

:-) 


---


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-04 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5940
  
Thanks @suez1224 @fhueske for the comments. I will change them accordingly. 

Yes I copied a lot of test cases from @haohui's PR for my own testing. I 
can definitely put it on top given the runtime support is already merged in 
#. Procedure-wise question: should I rebase his commit then add my change 
on top, then attached to this PR? I am not sure if there's a clever way to both 
preserve the discussion in this thread and rebase on top of his change.


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184861354
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,539 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
which
+  

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184861107
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,539 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
--- End diff --

seems like the `result.mode` can only be set through the environment config 
file. executing it in CLI doesn't take effect to me.


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184860963
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,539 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
--- End diff --

can we replace "following sections" with the actual link, docs might be 
updated later and the "following" statement might no longer be true


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184860924
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
--- End diff --

also `enviroment-files` is technically 4 parts away instead of the next 
part :-P


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184860868
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
--- End diff --

correct me if i were wrong, seems like @twalthr is referring to the 
embedded mode of the SQL-client. It might be useful to have another section 
"SQL Client Mode" -> "Embedded" / "Gateway" preceding this section, and put 
"Gateway" mode linked to "Limitation & Future" section


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184860810
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,539 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
--- End diff --

`see the [Cluster & Deployment](...) section.`


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support DistinctAgg on DataStre...

2018-04-28 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/5940

[FLINK-8690][table]Support DistinctAgg on DataStream

## What is the purpose of the change

* Allow FlinkLogicalAggregate to support distinct aggregations on 
DataStream, while keeping DataSet to decompose distinct aggs into GROUP BY 
follow by normal aggregates.

## Brief change log

  - Moved `AggregateExpandDistinctAggregatesRule.JOIN` to 
`DATASET_NORM_RULES`
  - Enabled `DataStreamGroupWindowAggregate` to support distinct agg while 
maintaining unsupported for `[DataStream/DataSet]GroupAggregate`.
  - Fixed typo in codegen for distinct aggregate when merge
  - Fixed a possible codegen test error for `UNION ALL`.

## Verifying this change

  - Unit-test are added for `DistinctAggregateTest`
  - Added ITCase for distinct group window agg

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes (codegen)
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not yet, should we put in 
`Aggregate` section or `Group Window` section? inputs are highly appreciated. 
Also distinct over aggregate is bug-fixed in FLINK-8689 but not documented.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8690

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5940.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5940


commit c517821d13341ae10b5d47acdbd0cc7d5bbe38b7
Author: Rong Rong <rongr@...>
Date:   2018-04-28T15:59:12Z

moving AggregateExpandDistinctAggregatesRule.JOIN to DATASET_NORM_RULES, 
and enabled distinct aggregate support for window aggregate over datastream




---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184441869
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var 
mapView: MapView[E, JLon
   override def equals(that: Any): Boolean =
 that match {
   case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
-this.mapView == that.mapView
+this.distinctValueMap == that.distinctValueMap
   case _ => false
 }
 
   def add(element: E): Boolean = {
-if (element != null) {
-  val currentVal = mapView.get(element)
-  if (currentVal != null) {
-mapView.put(element, currentVal + 1L)
-false
-  } else {
-mapView.put(element, 1L)
-true
-  }
-} else {
+val wrappedElement = Row.of(element)
--- End diff --

Thanks @fhueske for the insight. yeah I thought about that before the last 
commit but didn't go through with it, since we still need to construct the row 
of single element before passing to the distinct accumulator. But you are 
right, it will make future optimization easier


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184429742
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L,

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184428065
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L,

[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184115873
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,36 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+
+val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
+  case (isDistinctAgg, idx) => if (isDistinctAgg) {
+val fieldIndex: Int = aggFields(idx)(0)
+val mapViewTypeInfo = new MapViewTypeInfo(
+  physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO)
--- End diff --

at this moment it will disregard any null value. But as you mention this is 
not correct. Will address. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184125030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Long => JLong}
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, JLong]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, JLong]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, JLong]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
--- End diff --

Great catch @fhueske. Sorry I misunderstood what you meant previous 
regrading `null` value. This line was to disregard all `null` values. But as 
you mentioned, some operators might treat `null` value differently, e.g. 
`COUNT` aggregators. I added the handling and the tests as well. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-25 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184115947
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,36 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+
+val distinctAggs: Array[Seq[DataViewSpec[_]]] = 
isDistinctAggs.zipWithIndex.map {
+  case (isDistinctAgg, idx) => if (isDistinctAgg) {
+val fieldIndex: Int = aggFields(idx)(0)
+val mapViewTypeInfo = new MapViewTypeInfo(
+  physicalInputTypes(fieldIndex), BasicTypeInfo.LONG_TYPE_INFO)
+Seq(
+  MapViewSpec(
+"distinctAgg" + idx + "_field" + fieldIndex,
--- End diff --

+1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-24 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183880939
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Yes. You are right. Sorry I missed that. Just updated :-)


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222089
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
+
+  distinctAggs(index) = Seq(
--- End diff --

`mapViewTypeInfo` doesn't seem to be available in codegen.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -327,19 +392,41 @@ class AggregationCodeGenerator(
 for (i <- aggs.indices) yield
 
   if (partialResults) {
-j"""
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+if (isDistinctAggs(i)) {
+
+  j"""
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) 
distinctAcc$i.getRealAcc());""".stripMargin
+} else {
+  j"""
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  (${accTypes(i)}) accs.getField($i));""".stripMargin
+}
   } else {
-j"""
-   |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
-   |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
-   |${genDataViewFieldSetter(s"acc$i", i)}
-   |output.setField(
-   |  ${aggMapping(i)},
-   |  baseClass$i.getValue(acc$i));""".stripMargin
+if (isDistinctAggs(i)) {
+  j"""
+ |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
+ |  
(org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |$distinctAccType distinctAcc$i = ($distinctAccType) 
accs.getField($i);
+ |${genDistinctDataViewFieldSetter(s"distinctAcc$i", 
i)}
+ |${accTypes(i)} acc$i = (${accTypes(i)}) 
distinctAcc$i.getRealAcc();
+ |${genAccDataViewFieldSetter(s"acc$i", i)}
+ |output.setField(
+ |  ${aggMapping(i)},
+ |  baseClass$i.getValue(acc$i));""".stripMargin
+} else {
--- End diff --

yeah, that's true. it will avoid a lot of typo possibility as well. +1


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222457
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
 ---
@@ -70,7 +70,12 @@ trait OverAggregate {
 
 val aggStrings = namedAggregates.map(_.getKey).map(
   a => s"${a.getAggregation}(${
-if (a.getArgList.size() > 0) {
+val prefix = if (a.isDistinct) {
--- End diff --

will address in FLINK-8690


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222451
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util.{Map => JMap}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * The base class for accumulator wrapper when applying distinct 
aggregation.
+  * @param realAcc the actual accumulator which gets invoke after distinct 
filter.
+  * @param mapView the [[MapView]] element used to store the distinct 
filter hash map.
+  * @tparam E the element type for the distinct filter hash map.
+  * @tparam ACC the accumulator type for the realAcc.
+  */
+class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: 
MapView[E, Integer]) {
+  def this() {
+this(null.asInstanceOf[ACC], new MapView[E, Integer]())
+  }
+
+  def this(realAcc: ACC) {
+this(realAcc, new MapView[E, Integer]())
+  }
+
+  def getRealAcc: ACC = realAcc
+
+  def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, 
ACC]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
+this.mapView == that.mapView
+  case _ => false
+}
+
+  def add(element: E): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
+mapView.put(element, mapView.get(element) + 1)
+false
+  } else {
+mapView.put(element, 1)
+true
+  }
+} else {
+  false
+}
+  }
+
+  def add(element: E, count: Int): Boolean = {
+if (element != null) {
+  if (mapView.contains(element)) {
--- End diff --

+1 here as well, just in case I forgot


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222077
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1439,7 +1467,47 @@ object AggregateUtil {
   }
 }
 
-(aggFieldIndexes, aggregates, accTypes, accSpecs)
+// create distinct accumulator filter argument
+val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
+aggregateCalls.zipWithIndex.foreach {
+  case (aggCall, index) =>
+if (aggCall.isDistinct) {
+  val argList: util.List[Integer] = aggCall.getArgList
+  // Only support single argument for distinct operation
+  if (argList.size() > 1) {
+throw TableException(
+  "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
+  }
+  val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
+  val fieldIndex = aggFieldIndexes(index)(0)
+  val mapViewTypeInfo = new MapViewTypeInfo(
+FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
--- End diff --

That's a good point. I will add test for this use case.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r18367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -151,8 +157,15 @@ class AggregationCodeGenerator(
   }
 }
 
-// initialize and create data views
-addReusableDataViews()
+// get distinct filter of acc fields for each aggregate functions
+val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}"
+val isDistinctAggs = distinctAggs.map(_.nonEmpty)
--- End diff --

Hmm I tried to generate MapViewSpec in codegen but 
`AggregationCodeGenerator.generateAggregations` call signature seems to miss 
the typeinfo for accumulator (`accTypes` only shows the accumulator type, not 
the `PojoField` info). 

I can certainly changed to a more simple: `DistinctAccumulator[Object, 
Long]` but I think there's pros and cons, also making a `MapTypeInfo` for plain 
`Object` seems funny to me. How about we leave it to the future optimization.


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183598006
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
--- End diff --

This test case was added to cover all over aggregate usage on distinct 
accumulator. 


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-23 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r183222573
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +51,96 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
--- End diff --

Both case went through: `AggregateExpandDistinctAggregatesRule`, thus 
produce plans that breaks the distinct aggregate into a `GROUP BY` over 
distinct key, and aggregate without distinct. In the 2nd case, 
`LogicalWindowAggregateRule` did not applied due to `distinct` keyword. 

They should both be fixed in FLINK-8690


---


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-04-20 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
Thanks @fhueske, I had the exact same feeling. Just attaching `MapState` 
towards the back of the `Row` might be a current working solution for now, but 
will probably be nasty to maintain in the future. 

I am planning to go ahead and create an optimization ticket further down 
once I had FLINK-8690 completed. What do you think?


---


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-04-19 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
Hi @fhueske . Thanks for the review, all very good points. I will follow up 
with the next steps. Actually @hequn8128  and I had some discussions regarding 
the follow up in FLINK-8690 already and I created 2 different approaches. 
Please kindly take a look when you have time :-)

Best,
Rong


---


[GitHub] flink issue #5867: [FLINK-8686] [sql-client] Improve basic embedded SQL clie...

2018-04-18 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5867
  
Hi @twalthr, having JVM heap size configurable is definitely a great 
benefit. Just to clarify, this is only changing the Client JVM heap size, 
correct? I am assuming this is mainly for manipulating large size data when 
retrieving results from the query. 

I think adding it to `flink-conf.yaml` should be fine. There are already 
specific configurations commented out for components like Kerberos, ZK, etc. 
which makes adding a specific conf to sql-client less disturbing. We can always 
break it out to another file later when extra configuration files are needed 
for sql-client in the future. 

What do you guys think?


---


[GitHub] flink issue #5865: [FLINK-9199][REST][hot-fix] handler and paramter typos

2018-04-18 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5865
  
thx @zentol , sorry I had to skip checkstyle and test to speed up the 
build. just fixed


---


[GitHub] flink pull request #5865: [FLINK-9199][REST][hot-fix] handler and paramter t...

2018-04-18 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/5865

[FLINK-9199][REST][hot-fix] handler and paramter typos

[hot-fix] fix message header / handler and parameter typos.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-9199

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5865.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5865


commit 9a655045154ba123aad7d071951b1a252e7a5304
Author: Rong Rong <rongr@...>
Date:   2018-04-18T06:44:02Z

fix handler and paramter typos




---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-04-17 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/5863

[FLINK-8985][e2etest] initial support for End-to-end CLI test, excluding 
YARN test

## What is the purpose of the change

Adding end to end test for CLI APIs.

## Brief change log

Added test_cli_api.sh script to test combinations of CLI commands listed in 
the doc section of Flink. Including:
  - Start up command sets (run)
  - Operational command sets (list/info/cancel)
  - Savepoint command sets (savepoint)

## Verifying this change

This is a test

## Does this pull request potentially affect one of the following parts:

No

## Documentation

No

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8985

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5863.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5863


commit 5f36ee5d4dcbb60a29a413fd29cfaaa69f7e8a47
Author: Rong Rong <rongr@...>
Date:   2018-04-17T20:45:23Z

initial commit to support CLI test, excluding YARN test




---


[GitHub] flink issue #5849: [FLINK-8986][e2e-test] Flink end to end test REST API

2018-04-15 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5849
  
At this moment it skips 8 of the 39 tests (PATCH & POST methods). And 
there's 2 test failures and I think should be fix in another JIRA. Namely,

`/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/accumulators`
 returns a 500
`/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/` 
returns a 500


---


[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

2018-04-15 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/5849

[FLINK-8986][e2e-test] Flink end to end test REST API 

## What is the purpose of the change

Adding end to end test for REST APIs for FLIP-6 generated endpoints

## Brief change log

  - Added `flink-rest-api-test` module in `flink-end-to-end-test` with a 
periodic stream job and a test suite that runs the REST API tests.
  - Added test script to run the REST API test.

## Verifying this change

N/A, this is a test

## Does this pull request potentially affect one of the following parts:

No

## Documentation


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-8986

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5849.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5849


commit 526e5ac1f9758451417972143d7117095b18f2ab
Author: Rong Rong <walter_ddr@...>
Date:   2018-04-15T15:05:08Z

Flink end to end test REST API with generated endpoints only




---


[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

2018-04-04 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r179178156
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
--- End diff --

I was wondering if a sink could have its own "schema configurations" for 
alignment with the output table schema? 
For example a CassandraTableSink / JDBCTableSink would definitely throw 
exceptions when trying to execute an insert with mismatched schemas.


---


[GitHub] flink pull request #5797: [FLINK-9104][doc]Re-generate REST API documentatio...

2018-04-03 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5797#discussion_r178913403
  
--- Diff: 
flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---
@@ -258,6 +265,37 @@ private static String createMessageHtmlEntry(Class 
messageClass, Class emp
return json;
}
 
+   /**
+* Create character escapes for HTML when generating JSON 
request/response string.
+*/
+   private static class HTMLCharacterEscapes extends CharacterEscapes {
--- End diff --

good point. added in comments to illustrate the necessity. 


---


[GitHub] flink pull request #5797: [Flink 9104][doc]Re-generate REST API documentatio...

2018-03-31 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/5797

[Flink 9104][doc]Re-generate REST API documentation for FLIP-6

## What is the purpose of the change

Fix REST-API doc generator and regenerate rest_dispatcher.html

## Brief change log

- Changes according to FLINK-8843
- Escape HTML characters

## Verifying this change

N/A

## Does this pull request potentially affect one of the following parts:

no

## Documentation

docs updated


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-9104

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5797.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5797


commit a5ad88f7fa8200cea5084b6d630592bc473a9b6a
Author: Rong Rong <walter_ddr@...>
Date:   2018-03-31T19:29:59Z

fix REST API doc generator bug and regenerate rest_dispatcher




---


[GitHub] flink issue #5796: [Flink 9104][doc]Re-generate REST API documentation for F...

2018-03-31 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5796
  
Wrong base


---


[GitHub] flink pull request #5796: [Flink 9104][doc]Re-generate REST API documentatio...

2018-03-31 Thread walterddr
Github user walterddr closed the pull request at:

https://github.com/apache/flink/pull/5796


---


[GitHub] flink pull request #5796: [Flink 9104][doc]Re-generate REST API documentatio...

2018-03-31 Thread walterddr
GitHub user walterddr opened a pull request:

https://github.com/apache/flink/pull/5796

[Flink 9104][doc]Re-generate REST API documentation for FLIP-6


## What is the purpose of the change

Fix REST-API doc generator and regenerate rest_dispatcher.html

## Brief change log

- Changes according to FLINK-8843
- Escape HTML characters

## Verifying this change

N/A

## Does this pull request potentially affect one of the following parts:

no

## Documentation

docs updated


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/walterddr/flink FLINK-9104

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5796.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5796


commit 8eb6a30798c09d171e3eb8019b53e677252bd5ba
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-23T10:28:20Z

[FLINK-8694][runtime] Fix notifyDataAvailable race condition

Before there was a race condition that might resulted in igonoring some 
notifyDataAvailable calls.
This fixes the problem by moving buffersAvailable handling to Supartitions 
and adds stress test
for flushAlways (without this fix this test is dead locking).

(cherry picked from commit ebd39f3)

commit 61a34a691e7d5233f18ac72a1ab8fb09b53c4753
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-26T15:13:06Z

[FLINK-8805][runtime] Optimize EvenSerializer.isEvent method

For example, previously if the method was used to check for 
EndOfPartitionEvent
and the Buffer contained huge custom event, the even had to be deserialized 
before
performing the actual check. Now we are quickly entering the correct 
if/else branch
and doing full costly deserialization only if we have to.

Other calls to isEvent() then checking against EndOfPartitionEvent were not 
used.

(cherry picked from commit 767027f)

commit d5338c4154e5de029b3b30e3ef0a0732bf7f68e7
Author: Piotr Nowojski <piotr.nowojski@...>
Date:   2018-02-27T09:39:00Z

[FLINK-8750][runtime] Improve detection of no remaining data after 
EndOfPartitionEvent

Because of race condition between:
  1. releasing inputChannelsWithData lock in this method and reaching this 
place
  2. empty data notification that re-enqueues a channel
we can end up with moreAvailable flag set to true, while we expect no more 
data.

This commit detects such situation, makes a correct assertion and turn off 
moreAvailable flag.

(cherry picked from commit b9b7416)

commit 32384ed9b00cf0e1961d355dc4080f25a2156e58
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-02-22T14:41:38Z

[FLINK-8747][bugfix] The tag of waiting for floating buffers in 
RemoteInputChannel should be updated properly

(cherry picked from commit 6e9e0dd)

commit f1453276095c55264f7b4097d16e2987a44b3f33
Author: Zhijiang <wangzhijiang999@...>
Date:   2018-02-23T01:55:57Z

[hotfix] Fix package private and comments

(cherry picked from commit 6165b3d)

commit 18ff2ce15bdb1e7bd246e438e47527a24559c86d
Author: Nico Kruber <nico@...>
Date:   2018-02-26T16:50:10Z

[hotfix][network] minor improvements in UnionInputGate

(cherry picked from commit 4203557)

commit 9265666517830350a4a7037029e347f33df1bea2
Author: Nico Kruber <nico@...>
Date:   2018-02-26T16:52:37Z

[FLINK-8737][network] disallow creating a union of UnionInputGate instances

Recently, the pollNextBufferOrEvent() was added but not implemented but 
this is
used in getNextBufferOrEvent() and thus any UnionInputGate containing a 
UnionInputGate
would have failed already. There should be no use case for wiring up inputs
this way. Therefore, fail early when trying to construct this.

(cherry picked from commit e8de538)

commit 26c8f6c2a3ff75ffb954c816a57908318a2d8099
Author: Stephan Ewen <sewen@...>
Date:   2018-02-28T11:15:30Z

[hotfix] [tests] Fix SelfConnectionITCase

The test previously did not fail on failed execution, and thus evaluated 
incomplete results
from a failed execution with th expected results.

This cleans up serialization warnings and uses lambdas where possible, to 
make the code
more readable.

commit f60e46dafa8950d5e40cd8a3286c172ecaea6b73
Author: gyao <gary@...>
Date:   2018-02-28T12:04:19Z

[hotfix] Add missing space to log message in ZooKeeperLeaderElectionService

commit b7247929d0745b3b83306d0c93d97faf4ece4107
Author: gyao <gary@...>
Date:   2018-02-28T12:06:00Z

[hotfix][Javadoc] Fix typo in YARN Utils: teh -> the

commit adb3750226971f7c67a0d3069103b56e4fee1c27
Author: gyao <gary@...>
Date:   2018-02-28T12:07:04

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

2018-03-30 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r178334028
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map<String, Source> sources;
+   private Map<String, TableDescriptor> tables;
 
private Execution execution;
 
private Deployment deployment;
 
public Environment() {
-   this.sources = Collections.emptyMap();
+   this.tables = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
}
 
-   public Map<String, Source> getSources() {
-   return sources;
+   public Map<String, TableDescriptor> getTables() {
+   return tables;
}
 
-   public void setSources(List<Map<String, Object>> sources) {
-   this.sources = new HashMap<>(sources.size());
-   sources.forEach(config -> {
-   final Source s = Source.create(config);
-   if (this.sources.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate source 
name '" + s + "'.");
+   public void setTables(List<Map<String, Object>> tables) {
+   this.tables = new HashMap<>(tables.size());
+   tables.forEach(config -> {
+   if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
+   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
--- End diff --

Got it, so `both` should probably be added here once `sink` type is 
supported in FLINK-8866. Thanks for the clarification.


---


[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

2018-03-30 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r178334057
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.foreach(_.addProperties(properties))
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+metaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Reads table statistics from the descriptors properties.
+*/
+  protected def getTableStats: Option[TableStats] = {
+val normalizedProps = new DescriptorProperties()
+addProperties(normalizedProps)
+val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+rowCount match {
+  case Some(cnt) =>
+val columnStats = readColumnStats(normalizedProps, 
STATISTICS_COLUMNS)
+Some(TableStats(cnt, columnStats.asJava))
+  case None =>
+None
+}
+  }
+}
+
+object TableDescriptor {
+  /**
+* Key for describing the type of this table, valid values are 
('source').
+*/
+  val TABLE_TYPE = "type"
+
+  /**
+* Valid TABLE_TYPE value.
+*/
+  val TABLE_TYPE_SOURCE = "source"
--- End diff --

make sense 👍 


---


[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

2018-03-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r177830071
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -102,10 +112,10 @@ public static Environment parse(String content) 
throws IOException {
public static Environment merge(Environment env1, Environment env2) {
final Environment mergedEnv = new Environment();
 
-   // merge sources
-   final Map<String, Source> sources = new 
HashMap<>(env1.getSources());
-   mergedEnv.getSources().putAll(env2.getSources());
-   mergedEnv.sources = sources;
+   // merge tables
+   final Map<String, TableDescriptor> sources = new 
HashMap<>(env1.getTables());
--- End diff --

`final Map<String, TableDescriptor> tables`


---


[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

2018-03-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r177828794
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -29,38 +30,47 @@
 
 /**
  * Environment configuration that represents the content of an environment 
file. Environment files
- * define sources, execution, and deployment behavior. An environment 
might be defined by default or
+ * define tables, execution, and deployment behavior. An environment might 
be defined by default or
  * as part of a session. Environments can be merged or enriched with 
properties (e.g. from CLI command).
  *
  * In future versions, we might restrict the merging or enrichment of 
deployment properties to not
  * allow overwriting of a deployment by a session.
  */
 public class Environment {
 
-   private Map<String, Source> sources;
+   private Map<String, TableDescriptor> tables;
 
private Execution execution;
 
private Deployment deployment;
 
public Environment() {
-   this.sources = Collections.emptyMap();
+   this.tables = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
}
 
-   public Map<String, Source> getSources() {
-   return sources;
+   public Map<String, TableDescriptor> getTables() {
+   return tables;
}
 
-   public void setSources(List<Map<String, Object>> sources) {
-   this.sources = new HashMap<>(sources.size());
-   sources.forEach(config -> {
-   final Source s = Source.create(config);
-   if (this.sources.containsKey(s.getName())) {
-   throw new SqlClientException("Duplicate source 
name '" + s + "'.");
+   public void setTables(List<Map<String, Object>> tables) {
+   this.tables = new HashMap<>(tables.size());
+   tables.forEach(config -> {
+   if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
+   throw new SqlClientException("The 'type' 
attribute of a table is missing.");
--- End diff --

Just curious, any chance we need to have a Table to be both `source` and 
`sink` type. (e.g. similar to queryable state, if user wants to maintain 
explicitly a table for that purpose)


---


[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

2018-03-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5758#discussion_r177836030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing table sources and sinks.
+  */
+abstract class TableDescriptor extends Descriptor {
+
+  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+* Internal method for properties conversion.
+*/
+  override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+connectorDescriptor.foreach(_.addProperties(properties))
+formatDescriptor.foreach(_.addProperties(properties))
+schemaDescriptor.foreach(_.addProperties(properties))
+metaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+* Reads table statistics from the descriptors properties.
+*/
+  protected def getTableStats: Option[TableStats] = {
+val normalizedProps = new DescriptorProperties()
+addProperties(normalizedProps)
+val rowCount = 
toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
+rowCount match {
+  case Some(cnt) =>
+val columnStats = readColumnStats(normalizedProps, 
STATISTICS_COLUMNS)
+Some(TableStats(cnt, columnStats.asJava))
+  case None =>
+None
+}
+  }
+}
+
+object TableDescriptor {
+  /**
+* Key for describing the type of this table, valid values are 
('source').
+*/
+  val TABLE_TYPE = "type"
+
+  /**
+* Valid TABLE_TYPE value.
+*/
+  val TABLE_TYPE_SOURCE = "source"
--- End diff --

Maybe we can use either a ENUM or hierarchy to represent "valid" types? 
Does this seems like a good idea, such as: `TABLE_TYPE.SOURCE_TYPE`


---


[GitHub] flink issue #5750: [FLINK-8970] [E2E] HA end-to-end test with StateMachineEx...

2018-03-22 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5750
  
Based on the umbrella task link FLINK-8970, it seems like this e2e test 
should be attached to FLINK-8973 instead?


---


[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI

2018-03-20 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5638
  
Thanks for the explanation @buptljy. Yeah if that's the case the only way 
to validate is via ITCase, it might be an overkill in this situation though. 


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-20 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175825522
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

Yes. This was kinda confusing to me, we should clean this up when adding 
DISTINCT support. Thanks for the update @fhueske 


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175317673
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+new QueryableTableSink(this.namePrefix, this.queryConfig)
+  }
+}
+
+class RowKeySelector(
+  private val keyIndices: Array[Int],
+  @transient private val returnType: TypeInformation[Row])
+  extends KeySelector[JTuple2[JBool, Row], Row]
+with ResultTypeQueryable[Row] {
+
+  override def getKey(value: JTuple2[JBool, Row]): Row = {
+val keys = keyIndices
+
+val srcRow = value.f1
+
+val destRow = new Row(keys.length)
+var i = 0
+while (i < keys.length) {
+  destRow.setField(i, srcRow.getField(keys(i)))
+  i += 1
+}
+
+destRow
+  }
+
+  override def getProducedType: TypeInformation[Row] = returnType
+}
+
+class QueryableStateProcessFunction(
+  private val namePrefix: String,
+  private val queryConfig: StreamQueryConfig,
+  private val keyNames: Array[String],
+  private val fieldNames: Array[String],
+  private val fieldTypes: Array[TypeInformation[_]])
+  extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], 
Void](queryConfig) {
+
+  @transient private var states = Array[ValueState[AnyRef]]()
+  @transient private var nonKeyIndices = Array[Int]()
+
+  override def open(paramete

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175315979
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
--- End diff --

Besides @xccui 's suggestion in adding formal docs. Could you also add some 
java doc style comment here explaining what this table sink does and how this 
sink is intended to be used? 


---


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r175296949
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

yup. I thought so too. Thanks for exploring all the possibilities 👍 


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175296744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

shouldn't the logical rule supports distinct call here? It seems like 
previously the error were thrown on the `DataSetWindowAggregateRule` and 
`DataStreamWindowAggregateRule` respectively. Any chance we can add a unit-test 
to further clarify this change?



---


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-03-11 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
Thanks @hequn8128 for the prompt review. 
Are you suggesting we created the mapView parallel with the accumulator? 
The reason why I kept `DistinctAccumulator` is to act as a delegate to enclose 
the actual accumulator so that it can be passed around in the 
`accumulatorState` field without extending the arity.

I guess if we separate the mapView with the accumulator. I guess I can 
separately create another field in the `accumulatorState` `Row` to store the 
`mapView`(s)... This way it might be easier to handle the "reuse same mapView 
for multiple different distinct agg function" case as we discussed in the doc.

Another question is I was trying to reuse as much utility of dataview 
codegen as possible, as most of them are tightly coupled with the accumulators. 
I guess I can further refactor (which I already did quite a bit already).

Please let me know if that's what you had in mind @hequn8128 

--
Rong


---


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-09 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r173607312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

nvm, I think it might be too weird to write something like 
`base.log(antilogarithm)` in table API.


---


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-09 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r173606447
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

Is it just `new Ln(antilogarithm)`, we might be able to restructure them 
together?


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-03-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408177
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for 
stream-stream non-window Join.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JT

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-03-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408402
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
--- End diff --

I think either is fine as long as they are consistent.


---


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-03-05 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
@hequn8128 @fhueske Thanks for the feedback. I have updated the diff to 
directly use DistinctAccumulator for filtering and modified the 
`generateAggregation` API. Please kindly take another look when you have time. 
I have resolved the issue of multiple layer dataview codegen. 

In terms of reusing same `DataView` for multiple distinct aggregations 
against the same field, I tried to incorporate but there are many assumptions 
with single mapping between `AggregateFunction`s and `Accumulator`s that's hard 
to deal with. I am planning to continue and improve on it in a separated JIRA, 
what do you think?


---


[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...

2018-02-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5174#discussion_r171346253
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcWithSplitCodeGenITCase.scala
 ---
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
--- End diff --

Yes, it does but not in flink-table module


---


[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...

2018-02-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5174#discussion_r170804955
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
 ---
@@ -41,7 +41,7 @@ import org.apache.flink.table.codegen.Indenter.toISC
 class FunctionCodeGenerator(
 config: TableConfig,
 nullableInput: Boolean,
-input1: TypeInformation[_ <: Any],
+val input1: TypeInformation[_ <: Any],
--- End diff --

Any specific reason for this change? typo?


---


[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...

2018-02-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5174#discussion_r171358583
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 ---
@@ -46,10 +47,22 @@ trait CommonCalc {
   returnSchema.fieldNames,
   calcProjection)
 
+val (defines, bodies, callings) = generateCalcSplitFunctions(
+  generator,
+  projection.codeBuffer,
+  config.getMaxGeneratedCodeLength)
+
+val split = config.getMaxGeneratedCodeLength < projection.code.length
+val projectionCode = if (split && !projection.codeBuffer.isEmpty) {
--- End diff --

We can probably add this to the `generatedCalcSplitFunctions`, and let it 
always return the `samHeader` function (split or not), and with optional 
`defines` and `bodies`. This encapsulates all splitting functionality in one 
place, what do you think?


---


[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...

2018-02-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5174#discussion_r171360773
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcWithSplitCodeGenITCase.scala
 ---
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableConfig, TableEnvironment}
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, 
RichFunc2}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
UserDefinedFunctionTestUtils}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * set TableConfig's MaxGeneratedCodeLength to 1
+  * make sure every expression is in an independent function call
+  */
+class CalcWithSplitCodeGenITCase extends StreamingMultipleProgramsTestBase 
{
--- End diff --

I see you probably copied all tests in `CalcITCase` only to add the 
configuration for the max codegen length to 1. Why not try using something 
similar to `TableProgramsTestBase` to add two unique test configs? 


---


[GitHub] flink pull request #5174: [FLINK-8274][TableAPI & SQL] Fix Java 64K method c...

2018-02-28 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5174#discussion_r171357748
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 ---
@@ -171,4 +186,47 @@ trait CommonCalc {
   rowCnt
 }
   }
+
+  /**
+* split origin generated code to split function calls, only used for 
calc.
+* @param generator
+* @param codeBuffer
+* @param maxLength
+* @return (method definitions, method bodies, method callings) of 
split function calls
+*/
+  private def generateCalcSplitFunctions(
--- End diff --

Seems like in order to avoid the out of compilation limit issue, you need 
to put this in all Common nodes. 
Any changes we can put it in `plan/util` and make it more generic? will 
probably imagine same thing could happen to any plan node that requires to 
generate a row of outputs.


---


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-02-27 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
I agree this design is much cleaner and easier to maintain later. I was 
hesitating to change the function signature of `generateAggregations()`.

I will try to introduce some common building blocks and handle 
implementation effort in codegen. I will imagine a lot of ITCases will be 
needed :-)

Thanks for the feedback @hequn8128 @fhueske 


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170610689
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 ---
@@ -201,18 +202,294 @@ class JoinITCase extends StreamingWithStateTestBase {
 // Proctime window output uncertain results, so assert has been 
ignored here.
   }
 
+  @Test
+  def testJoin(): Unit = {
--- End diff --

Can this be more specific? like, inner equality join


---


  1   2   >