[GitHub] flink pull request #4940: [FLINK-7959] [table] Split CodeGenerator into Code...

2017-11-03 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/4940

[FLINK-7959] [table] Split CodeGenerator into CodeGeneratorContext and 
ExprCodeGenerator.


## What is the purpose of the change

Split current `CodeGenerator` into two dedicated classes, 
`CodeGeneratorContext` and `ExprCodeGenerator`. The `CodeGeneratorContext` is 
responsible for maintaining various reusable statements that could be insert 
into the final generated class, while the `ExprCodeGenerator` is responsible 
for generating codes for `RexNode` and generating result convertion codes.

## Brief change log

  - *Remove `CodeGenerator` and introduce `ExprCodeGenerator` and 
`CodeGeneratorContext`*
  - *Make some code generator static*

## Verifying this change

*(Please pick either of the following options)*

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-7959

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4940.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4940


commit 77b2d0ac00671347b0952719d70e2eb8230cd8d4
Author: Kurt Young <ykt...@gmail.com>
Date:   2017-11-03T07:37:22Z

[FLINK-7959] [table] Split CodeGenerator into CodeGeneratorContext and 
ExprCodeGenerator.




---


[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-09-29 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3511
  
> So I think a simpler and better approach is to just make sure that most 
types have a good implementation of putNormalizedKey, and then 
NormalizedKeySorter.compareRecords would be called only rarely, so its 
performance wouldn't really matter.

You are right when the sort keys are simple numeric types, but not with 
strings, which maybe the most popular choice in some ETL and data warehouse 
pipelines. But i agree that code generation can't help with this situation, so 
we investigate some binary data formats to represent our record and modify the 
interface of TypeSerializer & TypeComparator when doing ser/de. We don't have 
to consume the input/output view byte by byte, but has the ability to random 
access the underlying data, aka MemorySegment. It acts like spark's UnsafeRow: 
https://reviewable.io/reviews/apache/spark/5725, so we can eliminate the most 
deserialization cost such as `read byte[]` and then `new String(byte[])`.  We 
combine this approach with some code generation to eliminate the virtual 
function call of the TypeComparator and see a 10x performance improvements with 
sorting on strings. 

> I think a large potential in code-generation is to eliminate the 
overheads of the very many virtual function calls throughout the runtime

Totally agreed, after we finish dealing with the code generation and 
improving the ser/de, we will investigate more about this. Good to see that you 
have a list of all the megamorphic calls. BTW, we are actually translating the 
batch jobs into the streaming runtime, i think there will be lots in common. 

Having and control more type informations, and code generation the whole 
operator have lots of benefits, it can also help to making most of the calls 
monomorphic, such as:
- fully control of the object reusing, yes
- comparators
- generating hash codes
- potential improvements of some algorithm which finds out they only need 
to deal with fixed length data
- Directly using primitive variables when dealing with simple type

And you are right this is orthogonal with runtime improvements, and we see 
the boundary is the Operator. The framework should provide the most efficient 
environment for operators to run, and we will code generating the most 
efficient operators to live in it. 

> Btw. have you seen this PR for code generation for POJO serializers and 
comparators? #2211

I didn't see it yet, will find some time to check it out.


---


[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-09-29 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3511
  
@heytitle @ggevay Great work! 
We are working on a project to fully code generate the algorithm Flink 
runtime used, and borrowed lots of idea of this PR, thanks! IMHO, in addition 
to these changes, there are still some potential improvements we can do about 
the sorter, like deserialization when comparing the real records. To achieve 
this, we need more type information control and flexible code generating 
supports, so we choose to do it through the Table API & SQL. How do you see 
this approach?


---


[GitHub] flink issue #4667: [FLINK-5859] [table] Add PartitionableTableSource for par...

2017-09-17 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4667
  
LGTM, @fhueske @twalthr can you also take a look?


---


[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049572
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
--- End diff --

We should make this flag more clear. If you mean this flag represents 
whether the partition pruning is applied, i would say it should always be true, 
because when this method been called, at least framework had tried to apply the 
partition pruning.


---


[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049316
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
--- End diff --

Looks like the definition of "prunedPartitions" is contrary here. I think 
we should stick to only one definition, either "prunedPartitions" represents 
all partitions which have been pruned, or all remaining partitions which 
survive after pruning.


---


[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048604
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
+* Notes: If partition pruning is not applied, 
prunedPartitions is empty.
+* @param predicates   A list contains conjunctive predicates, you 
should pick and remove all
+* expressions that can be pushed down. The 
remaining elements of this
+* list will further evaluated by framework.
+* @return A new cloned instance of [[TableSource]].
+*/
+  def applyPrunedPartitionsAndPredicate(
+partitionPruned: Boolean,
+prunedPartitions: JList[Partition],
+predicates: JList[Expression]): TableSource[T]
+
+
+  /**

[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048337
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -55,4 +55,9 @@ trait FilterableTableSource[T] {
 */
   def isFilterPushedDown: Boolean
 
+  /**
+* @param relBuilder Builder for relational expressions.
+*/
+  def setRelBuilder(relBuilder: RelBuilder): Unit
--- End diff --

Can you move this method to PartitionableTableSource?


---


[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4668
  
@fhueske sure


---


[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4668
  
Thanks @JingsongLi for your contribution
LGTM, +1 to merge
cc @fhueske @StephanEwen 


---


[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...

2017-09-06 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4529
  
@zentol So the rationale behind this is when all netty's memory comes from 
flink's managed memory, then this is not an issue, right?



---


[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...

2017-09-06 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4529
  
If I remember correctly, the reason we chose to do an extra copy inside 
`extractFrame` is that we faced some memory problem when using `slice`. Is this 
no longer an issue or have taken care by another PR?


---


[GitHub] flink issue #4445: [FLINK-7310][core] always use the HybridMemorySegment

2017-09-05 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4445
  
I would bet on deserialization for it. And why sorter suffers more 
regression than hash join is that sorter will cause more deserializations 
during compare records than hash join.

Despite the regression we will face, i think it's still worthy since we can 
avoid an extra copy from network to runtime. It's better if we can take the 
extra copy into account during benchmark, but it's ok we don't have it. 

+1 to merge this.


---


[GitHub] flink pull request #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and D...

2017-07-12 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4305#discussion_r127113118
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
@@ -843,11 +843,14 @@ private float convertToFloat(Object o, float 
defaultValue) {
}
else if (o.getClass() == Double.class) {
double value = ((Double) o);
-   if (value <= Float.MAX_VALUE && value >= 
Float.MIN_VALUE) {
-   return (float) value;
-   } else {
+   if (value < -Float.MAX_VALUE
--- End diff --

I'm ok with both, will write it in your way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and D...

2017-07-12 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4305#discussion_r127107209
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
@@ -843,11 +843,14 @@ private float convertToFloat(Object o, float 
defaultValue) {
}
else if (o.getClass() == Double.class) {
double value = ((Double) o);
-   if (value <= Float.MAX_VALUE && value >= 
Float.MIN_VALUE) {
-   return (float) value;
-   } else {
+   if (value < -Float.MAX_VALUE
--- End diff --

I'm afraid in that case, 0 can not be treated right, so i add a test case 
to check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and Double.M...

2017-07-12 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4305
  
Hi @greghogan , thanks for the review, your comments have been addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and D...

2017-07-12 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/4305

[FLINK-7161] fix misusage of Float.MIN_VALUE and Double.MIN_VALUE

I already checked all places using Float.MIN_VALUE and Double.MIN_VALUE and 
fixed all misusage.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-7161

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4305


commit 5c7a2fd4c97eaedba87b0042e337d94f315b4191
Author: Kurt Young <ykt...@gmail.com>
Date:   2017-07-12T06:02:58Z

[FLINK-7161] fix misusage of Float.MIN_VALUE and Double.MIN_VALUE




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...

2017-05-18 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3913#discussion_r117385333
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats
   * @param lastAccessTime   Timestamp of last access of the table
   */
 case class ExternalCatalogTable(
-identifier: TableIdentifier,
--- End diff --

I agree not all the "tables" have a name, sounds good to me then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...

2017-05-18 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3913#discussion_r117201965
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats
   * @param lastAccessTime   Timestamp of last access of the table
   */
 case class ExternalCatalogTable(
-identifier: TableIdentifier,
--- End diff --

We have tested this with our own catalog integration, and keep the table 
name should work. The sub catalog which representing the database layer can 
concat the db and table name when creating Table instance. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...

2017-05-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3913#discussion_r116919339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -128,7 +128,7 @@ case class DatabaseNotExistException(
   * @param db database name
   * @param cause the cause
   */
-case class DatabaseAlreadyExistException(
+case class CatalogAlreadyExistException(
--- End diff --

Same above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...

2017-05-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3913#discussion_r116919313
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -114,7 +114,7 @@ case class TableAlreadyExistException(
   * @param db database name
   * @param cause the cause
   */
-case class DatabaseNotExistException(
+case class CatalogNotExistException(
--- End diff --

Please update the comment and parameter name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...

2017-05-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3913#discussion_r116919169
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats
   * @param lastAccessTime   Timestamp of last access of the table
   */
 case class ExternalCatalogTable(
-identifier: TableIdentifier,
--- End diff --

If we don't have table identifier or table name, the `TableSourceConverter` 
will not work when trying to convert `ExternalCatalogTable` to a real 
`TableSource`.

Even if we keep the table name, i'm not sure it will work for all 
situations. For example, if we have an outside catalog which have the notion of 
database, like MySQL. Table in that catalog will named as "db1.table1" or 
"db2.table2". In the new design, normally we should add database as the sub 
catalog of the root catalog. So we can create "table1" from sub-catalog named 
"db1". The problem is the table name is assigned as "table1", we loose the 
information that this table actually comes from "db1". This may cause some 
problems when the TableSource trying to establish connection or trying to get 
table information from outside catalog. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3803: [FLINK-6394] [runtime] Respect object reuse configuration...

2017-05-08 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3803
  
Ok, do it right way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3803: [FLINK-6394] [runtime] Respect object reuse configuration...

2017-05-08 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3803
  
Sorry i wasn't available in the last couple days, it seems the 1.3 branch 
has been created, should i merge this PR in to 1.3 branch too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3803: [FLINK-6394] [runtime] Respect object reuse config...

2017-04-30 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3803

[FLINK-6394] [runtime] Respect object reuse configuration when execut…

…ing group combining function



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-6394

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3803.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3803


commit 3f81f60603940ec2b26e81814224b69ae40afcc0
Author: Kurt Young <ykt...@gmail.com>
Date:   2017-04-30T08:56:00Z

[FLINK-6394] [runtime] Respect object reuse configuration when executing 
group combining function




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3394: [FLINK-5810] [flip6] Introduce a hardened slot man...

2017-04-30 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3394#discussion_r114070398
  
--- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
@@ -16,7 +16,7 @@
 # limitations under the License.
 

 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=DEBUG, console
--- End diff --

Is this change intended or accidental?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3794: [FLINK-6398] RowSerializer's duplicate should alwa...

2017-04-27 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3794

[FLINK-6398] RowSerializer's duplicate should always return a new instance



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-6398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3794.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3794


commit 29f328493dce3adef828f3a93be171a57f1316b0
Author: Kurt Young <ykt...@gmail.com>
Date:   2017-04-27T15:37:21Z

[FLINK-6398] RowSerializer's duplicate should always return a new instance




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...

2017-04-18 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Rebased to master and will merge after build check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...

2017-04-18 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
@fhueske I have rename the package and update the union description. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111931445
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

+1, will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-18 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111930527
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

OK, will rename the packages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3594
  
Hi @fhueske , sorry for taking it so long to update this PR. 
Looks like it has so many conflicts with master now, once you are ok with 
all the changes, i will rebase it to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861591
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
 ---
@@ -18,58 +18,57 @@
 package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, 
LogicalTableFunctionScan}
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetCorrelate}
+import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
 
 /**
   * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
--- End diff --

I will remove this simple comment to be consistent will other convert rules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111861037
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 ---
@@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
 import java.lang.Double
 
+import org.apache.flink.table.rel.logical.FlinkLogicalCalc
+
 object FlinkRelMdRowCount extends RelMdRowCount {
 
-val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
-  BuiltInMethod.ROW_COUNT.method,
-  this)
+  val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+BuiltInMethod.ROW_COUNT.method,
+this)
+
+  def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
--- End diff --

I'm ok with both, will change to `Calc`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860657
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 ---
@@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {
 
 val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION 
SELECT a, b, c FROM $table")
+val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
+s"UNION ALL SELECT a, b, c FROM $table")
--- End diff --

Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i 
understand correctly, `UNION` will do a global distinct for all fields and 
`UNION ALL` just concat two datasets or datastreams. I think the behavior of 
`DataStream.union` is rather `UNION ALL` than `UNION`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111860136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.rel.FlinkConventions
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalTableSourceScan(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+table: RelOptTable,
+val tableSource: TableSource[_])
+  extends TableScan(cluster, traitSet, table)
+  with FlinkLogicalRel {
+
+  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
+new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, 
tableSource)
+  }
+
+  override def deriveRowType(): RelDataType = {
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+flinkTypeFactory.buildRowDataType(
+  TableEnvironment.getFieldNames(tableSource),
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val rowCnt = metadata.getRowCount(this)
+planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
--- End diff --

We need to rewrite `explainTerms` and `toString` to make the RelNode's 
digest distinguished from other `TableSourceScan`. The default implementation 
only compare the table name and it's not enough when we push filter or projects 
into the `TableSource`. 

Actually there is a bug in `toString` and fixed in 
https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a,
 i will backport it in here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111859756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalSort(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+child: RelNode,
+collation: RelCollation,
+offset: RexNode,
+fetch: RexNode)
+  extends Sort(cluster, traits, child, collation, offset, fetch)
+  with FlinkLogicalRel {
+
+  private val limitStart: Long = if (offset != null) {
+RexLiteral.intValue(offset)
+  } else {
+0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+RexLiteral.intValue(fetch) + limitStart
+  } else {
+Long.MaxValue
+  }
+
+  val getOffset: RexNode = offset
+
+  val getFetch: RexNode = fetch
+
+  override def copy(
+  traitSet: RelTraitSet,
+  newInput: RelNode,
+  newCollation: RelCollation,
+  offset: RexNode,
+  fetch: RexNode): Sort = {
+
+new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, 
offset, fetch)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = (inputRowCnt - limitStart).max(1.0)
+  if (fetch != null) {
+val limit = RexLiteral.intValue(fetch)
+rowCount.min(limit)
+  } else {
+rowCount
+  }
+}
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+// by default, assume cost is proportional to number of rows
+val rowCount: Double = mq.getRowCount(this)
+planner.getCostFactory.makeCost(rowCount, rowCount, 0)
+  }
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
--- End diff --

yes, these can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111858923
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.rel.FlinkConventions
+
+class FlinkLogicalAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+indicator: Boolean,
+groupSet: ImmutableBitSet,
+groupSets: JList[ImmutableBitSet],
+aggCalls: JList[AggregateCall])
+  extends Aggregate(cluster, traitSet, child, indicator, groupSet, 
groupSets, aggCalls)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  input: RelNode,
+  indicator: Boolean,
+  groupSet: ImmutableBitSet,
+  groupSets: JList[ImmutableBitSet],
+  aggCalls: JList[AggregateCall]): Aggregate = {
+new FlinkLogicalAggregate(cluster, traitSet, input, indicator, 
groupSet, groupSets, aggCalls)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+val aggCnt = this.aggCalls.size
+planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * 
rowSize)
+  }
+}
+
+private class FlinkLogicalAggregateConverter
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalAggregateConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+!agg.containsDistinctCall()
--- End diff --

This is similar with `FlinkLogicalJoin`, we need other logical rules to 
rewrite distinct aggregates first and then convert it to a "clean" 
FlinkLogicalAggregate. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857813
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
--- End diff --

How about rename `org.apache.flink.table.plan.node` to 
`org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` 
and `datastream`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111857715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.rel.logical
+
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.rel.FlinkConventions
+
+import scala.collection.JavaConverters._
+
+class FlinkLogicalJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode,
+joinType: JoinRelType)
+  extends Join(cluster, traitSet, left, right, condition, 
Set.empty[CorrelationId].asJava, joinType)
+  with FlinkLogicalRel {
+
+  override def copy(
+  traitSet: RelTraitSet,
+  conditionExpr: RexNode,
+  left: RelNode,
+  right: RelNode,
+  joinType: JoinRelType,
+  semiJoinDone: Boolean): Join = {
+
+new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, 
joinType)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val leftRowCnt = metadata.getRowCount(getLeft)
+val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+val rightRowCnt = metadata.getRowCount(getRight)
+val rightRowSize = estimateRowSize(getRight.getRowType)
+
+val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+val cpuCost = leftRowCnt + rightRowCnt
+val rowCnt = leftRowCnt + rightRowCnt
+
+planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+}
+
+private class FlinkLogicalJoinConverter
+  extends ConverterRule(
+classOf[LogicalJoin],
+Convention.NONE,
+FlinkConventions.LOGICAL,
+"FlinkLogicalJoinConverter") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+val joinInfo = join.analyzeCondition
+
+hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
--- End diff --

Currently the answer is yes, because the physical join can only support 
equation condition with simple keys. 
If the join key is express like `a + 1 = b - 2`, if we don't have this 
restriction in logical layer, the join condition will be like this and we can't 
translate to physical join. There are 2 possible solutions:
1. keep it this way
2. Re-introduce rules which can exact expression out of join condition and 
add addition "Calc" node to keep join key as simple field. 
I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to 
let logical layer know all these restrictions. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...

2017-04-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3594#discussion_r111856357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
 }
 
 // 3. optimize the logical Flink plan
-val optRuleSet = getOptRuleSet
-val flinkOutputProps = 
relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-val optimizedPlan = if (optRuleSet.iterator().hasNext) {
-  runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
+val logicalOptRuleSet = getLogicalOptRuleSet
+val logicalOutputProps = 
relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
+  runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, 
logicalOutputProps)
--- End diff --

Yes, i will first test how HepPlanner works in some test and product 
environments and then decide whether or how we change it to rule-based planner. 
So this will be my follow up task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3695: [FLINK-5545] [table] Remove FlinkAggregateExpandDi...

2017-04-07 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3695

[FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRul…

…e after bumping Calcite to v1.12.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3695.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3695


commit 2d8c740852e56cbc54120acea234975771171a66
Author: Kurt Young <ykt...@gmail.com>
Date:   2017-04-07T09:46:06Z

[FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule 
after bumping Calcite to v1.12.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3689: [FLINK-5435] [table] Remove FlinkAggregateJoinTran...

2017-04-06 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3689

[FLINK-5435] [table] Remove FlinkAggregateJoinTransposeRule and Flink…

…RelDecorrelator after calcite updated to 1.12



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5435

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3689.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3689


commit 6c4fb2f5a655129c178fdf543f64b240b3012e8b
Author: Kurt Young <ykt...@gmail.com>
Date:   2017-04-06T14:06:51Z

[FLINK-5435] [table] Remove FlinkAggregateJoinTransposeRule and 
FlinkRelDecorrelator after calcite updated to 1.12




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3623: [FLINK-6196] [table] Support dynamic schema in Table Func...

2017-04-01 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3623
  
Looks good to me, @twalthr @fhueske can you also take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-29 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108628810
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +366,116 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(
+  name: String,
+  tableFunction: TableFunction[_],
+  implicitResultType: TypeInformation[_],
+  params: Expression*): TableFunctionCall = {
+val arguments = expressionsToArguments(params: _*)
+val userDefinedResultType = tableFunction.getResultType(arguments)
+val resultType = {
+  if (userDefinedResultType != null) userDefinedResultType
+  else implicitResultType
+}
+TableFunctionCall(name, tableFunction, params, resultType)
+  }
+
+  /**
+* Transform the expressions or rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
+*
+* @param params actual parameters of the function
+* @return A java List of the Objects
+*/
+  private[table] def expressionsToArguments(params: Expression*): 
java.util.List[AnyRef] = {
+params.map {
+  case exp: Literal =>
+exp.value.asInstanceOf[AnyRef]
+  case _ =>
+null
+}
+  }
+
+  /**
+* Transform the rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
+*
+* @param rexNodes actual parameters of the function
+* @return A java List of the Objects
+*/
+  private[table] def rexNodesToArguments(
+  rexNodes: java.util.List[RexNode]): java.util.List[AnyRef] = {
+rexNodes.map {
+  case rexNode: RexLiteral =>
--- End diff --

Can we add an unit test to ensure all supported literal are successfully 
transformed? And other non supported type will translate to null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-29 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108628113
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DeferredTypeFlinkTableFunction.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import java.lang.reflect.Method
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{TableFunction => FlinkUDTF}
+
+/**
+  * A Deferred Type is a Table Function which the type hasn't been 
determined yet.
+  * If will determine the result type after the arguments are passed.
--- End diff --

If -> It


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-29 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108625377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +366,116 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(
+  name: String,
+  tableFunction: TableFunction[_],
+  implicitResultType: TypeInformation[_],
+  params: Expression*): TableFunctionCall = {
+val arguments = expressionsToArguments(params: _*)
+val userDefinedResultType = tableFunction.getResultType(arguments)
+val resultType = {
+  if (userDefinedResultType != null) userDefinedResultType
+  else implicitResultType
+}
+TableFunctionCall(name, tableFunction, params, resultType)
+  }
+
+  /**
+* Transform the expressions or rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
+*
+* @param params actual parameters of the function
+* @return A java List of the Objects
+*/
+  private[table] def expressionsToArguments(params: Expression*): 
java.util.List[AnyRef] = {
+params.map {
+  case exp: Literal =>
+exp.value.asInstanceOf[AnyRef]
+  case _ =>
+null
+}
+  }
+
+  /**
+* Transform the rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
+*
+* @param rexNodes actual parameters of the function
+* @return A java List of the Objects
+*/
+  private[table] def rexNodesToArguments(
--- End diff --

transformLiteralRexNodes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-29 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108625307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +366,116 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(
+  name: String,
+  tableFunction: TableFunction[_],
+  implicitResultType: TypeInformation[_],
+  params: Expression*): TableFunctionCall = {
+val arguments = expressionsToArguments(params: _*)
+val userDefinedResultType = tableFunction.getResultType(arguments)
+val resultType = {
+  if (userDefinedResultType != null) userDefinedResultType
+  else implicitResultType
+}
+TableFunctionCall(name, tableFunction, params, resultType)
+  }
+
+  /**
+* Transform the expressions or rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
+*
+* @param params actual parameters of the function
+* @return A java List of the Objects
+*/
+  private[table] def expressionsToArguments(params: Expression*): 
java.util.List[AnyRef] = {
+params.map {
+  case exp: Literal =>
+exp.value.asInstanceOf[AnyRef]
+  case _ =>
+null
+}
+  }
+
+  /**
+* Transform the rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
--- End diff --

Please update the comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-29 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108624961
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +366,116 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(
+  name: String,
+  tableFunction: TableFunction[_],
+  implicitResultType: TypeInformation[_],
+  params: Expression*): TableFunctionCall = {
+val arguments = expressionsToArguments(params: _*)
+val userDefinedResultType = tableFunction.getResultType(arguments)
+val resultType = {
+  if (userDefinedResultType != null) userDefinedResultType
+  else implicitResultType
+}
+TableFunctionCall(name, tableFunction, params, resultType)
+  }
+
+  /**
+* Transform the expressions or rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
--- End diff --

Only literal expressions will be transformed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-29 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108624874
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +366,116 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(
+  name: String,
+  tableFunction: TableFunction[_],
+  implicitResultType: TypeInformation[_],
+  params: Expression*): TableFunctionCall = {
+val arguments = expressionsToArguments(params: _*)
+val userDefinedResultType = tableFunction.getResultType(arguments)
+val resultType = {
+  if (userDefinedResultType != null) userDefinedResultType
+  else implicitResultType
+}
+TableFunctionCall(name, tableFunction, params, resultType)
+  }
+
+  /**
+* Transform the expressions or rex nodes to Objects
--- End diff --

rex nodes are not handled in this method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-29 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108624761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +366,116 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(
+  name: String,
+  tableFunction: TableFunction[_],
+  implicitResultType: TypeInformation[_],
+  params: Expression*): TableFunctionCall = {
+val arguments = expressionsToArguments(params: _*)
+val userDefinedResultType = tableFunction.getResultType(arguments)
+val resultType = {
+  if (userDefinedResultType != null) userDefinedResultType
+  else implicitResultType
+}
+TableFunctionCall(name, tableFunction, params, resultType)
+  }
+
+  /**
+* Transform the expressions or rex nodes to Objects
+* Only literal expressions will be passed, nulls for non-literal ones
+*
+* @param params actual parameters of the function
+* @return A java List of the Objects
+*/
+  private[table] def expressionsToArguments(params: Expression*): 
java.util.List[AnyRef] = {
--- End diff --

rename to transformLiteralExpresssions to better fits the purpose?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108320480
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
 ---
@@ -86,12 +90,79 @@ abstract class TableFunction[T] extends 
UserDefinedFunction {
 * @return [[Expression]] in form of a [[TableFunctionCall]]
 */
   final def apply(params: Expression*)(implicit typeInfo: 
TypeInformation[T]): Expression = {
-val resultType = if (getResultType == null) {
-  typeInfo
+buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*)
+  }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(name: String,
+implicitResultType: 
TypeInformation[_],
+params: Expression*): 
TableFunctionCall = {
+val arguments = expressionsToArguments(params: _*)
+val resultType = getResultType(arguments, implicitResultType)
+TableFunctionCall(name, this, params, resultType)
+  }
+
+  /**
+* Internal user of [[getResultType()]]
+*
+* @param arguments arguments of a function call (only literal arguments
+*  are passed, nulls for non-literal ones)
+* @param implicitResultType The implicit result type
+* @return [[TypeInformation]] of result type or null if Flink should 
determine the type
+*/
+  private[table] def getResultType(arguments: java.util.List[AnyRef],
--- End diff --

The format of the parameters should be consistent with others


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108320419
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -27,52 +27,73 @@ import 
org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{TableFunction => FlinkUDTF}
 
 /**
   * This is heavily inspired by Calcite's 
[[org.apache.calcite.schema.impl.TableFunctionImpl]].
   * We need it in order to create a 
[[org.apache.flink.table.functions.utils.TableSqlFunction]].
   * The main difference is that we override the [[getRowType()]] and 
[[getElementType()]].
+  *
+  * @param tableFunction The Table Function instance
+  * @param implicitResultType The implicit result type.
+  * @param evalMethod The eval() method of the [[tableFunction]]
   */
-class FlinkTableFunctionImpl[T](
-val typeInfo: TypeInformation[T],
-val fieldIndexes: Array[Int],
-val fieldNames: Array[String],
-val evalMethod: Method)
+class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_],
--- End diff --

Can we just separate this implementation into 2 classes, one for which we 
already know the type, and one for whose type has not been decided yet. 
Why i suggesting this is that i noticed `tableFunction.getResultType` is 
called multiple times when evaluating the plan, even we had already determine 
the row type long time ago, i.e. when `apply` method from `TableFunction` has 
been called. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108319900
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
 ---
@@ -27,52 +27,73 @@ import 
org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{TableFunction => FlinkUDTF}
 
 /**
   * This is heavily inspired by Calcite's 
[[org.apache.calcite.schema.impl.TableFunctionImpl]].
   * We need it in order to create a 
[[org.apache.flink.table.functions.utils.TableSqlFunction]].
   * The main difference is that we override the [[getRowType()]] and 
[[getElementType()]].
+  *
+  * @param tableFunction The Table Function instance
+  * @param implicitResultType The implicit result type.
+  * @param evalMethod The eval() method of the [[tableFunction]]
   */
-class FlinkTableFunctionImpl[T](
-val typeInfo: TypeInformation[T],
-val fieldIndexes: Array[Int],
-val fieldNames: Array[String],
-val evalMethod: Method)
+class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_],
+val implicitResultType: TypeInformation[_],
+val evalMethod: Method)
   extends ReflectiveFunctionBase(evalMethod)
   with TableFunction {
 
-  if (fieldIndexes.length != fieldNames.length) {
-throw new TableException(
-  "Number of field indexes and field names must be equal.")
-  }
+  override def getElementType(arguments: util.List[AnyRef]): Type = 
classOf[Array[Object]]
 
-  // check uniqueness of field names
-  if (fieldNames.length != fieldNames.toSet.size) {
-throw new TableException(
-  "Table field names must be unique.")
+  override def getRowType(typeFactory: RelDataTypeFactory,
+  arguments: util.List[AnyRef]): RelDataType = {
+
+// Get the result type from table function. If it is not null, the 
implicitResultType may
+// already be generated by Table API's apply() method.
+val resultType = if (tableFunction.getResultType(arguments) != null) {
+  tableFunction.getResultType(arguments)
+} else {
+  implicitResultType
+}
+val (fieldNames, fieldIndexes, _) = 
UserDefinedFunctionUtils.getFieldInfo(resultType)
+buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes)
   }
 
-  val fieldTypes: Array[TypeInformation[_]] =
-typeInfo match {
-  case cType: CompositeType[T] =>
-if (fieldNames.length != cType.getArity) {
-  throw new TableException(
-s"Arity of type (" + cType.getFieldNames.deep + ") " +
-  "not equal to number of field names " + fieldNames.deep + 
".")
-}
-
fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
-  case aType: AtomicType[T] =>
-if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
-  throw new TableException(
-"Non-composite input type may have only a single field and its 
index must be 0.")
-}
-Array(aType)
+  private [table] def buildRelDataType(typeFactory: RelDataTypeFactory,
--- End diff --

I think this method can go into `UserDefinedFunctionUtils`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108318532
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
 ---
@@ -131,8 +202,10 @@ abstract class TableFunction[T] extends 
UserDefinedFunction {
 * method. Flink's type extraction facilities can handle basic types or
--- End diff --

I think the comment should also be updated, to make user be aware of the 
behavior that he can determine the result type based on the parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108318419
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
 ---
@@ -86,12 +90,79 @@ abstract class TableFunction[T] extends 
UserDefinedFunction {
 * @return [[Expression]] in form of a [[TableFunctionCall]]
 */
   final def apply(params: Expression*)(implicit typeInfo: 
TypeInformation[T]): Expression = {
-val resultType = if (getResultType == null) {
-  typeInfo
+buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*)
+  }
+
+  /**
+* Build a TableFunctionCall, a name and a sequence of params will 
determine a unique
+* [[TableFunctionCall]]
+*
+* @param name function name
+* @param implicitResultType If no result type returned, it will use 
this type.
+* @param params The input expressions
+* @return A unique [[TableFunctionCall]]
+*/
+  private[table] def buildTableFunctionCall(name: String,
--- End diff --

Can we move this to a util class? Since this is a user facing API class, we 
don't want to expose such kind of thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...

2017-03-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3623#discussion_r108318278
  
--- Diff: flink-core/src/test/java/org/apache/flink/types/RowTest.java ---
@@ -21,6 +21,7 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
--- End diff --

this import is unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3553: [FLINK-6068] [table] Support If() as a built-in function ...

2017-03-20 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3553
  
Since `IF` is not sql standard, i think we could let java user just use 
`case when`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3559: [flink-6037] [Table API & SQL]hotfix: metadata pro...

2017-03-18 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3559#discussion_r106796945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 ---
@@ -100,6 +101,7 @@ class FlinkPlannerImpl(
   assert(validatedSqlNode != null)
   val rexBuilder: RexBuilder = createRexBuilder
   val cluster: RelOptCluster = RelOptCluster.create(planner, 
rexBuilder)
+  cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE)
--- End diff --

I remembered that in #3058 you also set the provider to 
`RelMetadataQuery.THREAD_PROVIDERS` by calling:
```

RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(cluster.getMetadataProvider))
```
Why it's not needed here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-18 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3406
  
Seems good to merge now, merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...

2017-03-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3560#discussion_r106772134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -227,18 +227,23 @@ object ProjectionTranslator {
 * @param exprs a list of expressions to extract
 * @return a list of field references extracted from the given 
expressions
 */
-  def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] 
= {
-exprs.foldLeft(Set[NamedExpression]()) {
+  def extractFieldReferences(exprs: Seq[Expression]): 
List[NamedExpression] = {
+exprs.foldLeft(List[NamedExpression]()) {
   (fieldReferences, expr) => identifyFieldReferences(expr, 
fieldReferences)
-}.toSeq
+}
   }
 
   private def identifyFieldReferences(
   expr: Expression,
-  fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr 
match {
+  fieldReferences: List[NamedExpression]): List[NamedExpression] = 
expr match {
--- End diff --

The order is really depending on how we extract fields from all kinds of 
expressions. Like `BinaryExpression`, we first extract `left child`, and then 
`right child`. And for `Funtion Calls`, we extract the field from parameter 
with left to right order. More complex example will be `over`, imagine there is 
an aggregate on a partitioned window. Should the fields appeared in the 
aggregate or the field which partitioned on should be considered first? 

So i think this kind of order is hard to define and hard to stay 
consistency, it will change easily when we modifying the codes. We should not 
rely anything on this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...

2017-03-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3560#discussion_r106669352
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -227,18 +227,23 @@ object ProjectionTranslator {
 * @param exprs a list of expressions to extract
 * @return a list of field references extracted from the given 
expressions
 */
-  def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] 
= {
-exprs.foldLeft(Set[NamedExpression]()) {
+  def extractFieldReferences(exprs: Seq[Expression]): 
List[NamedExpression] = {
+exprs.foldLeft(List[NamedExpression]()) {
   (fieldReferences, expr) => identifyFieldReferences(expr, 
fieldReferences)
-}.toSeq
+}
   }
 
   private def identifyFieldReferences(
   expr: Expression,
-  fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr 
match {
+  fieldReferences: List[NamedExpression]): List[NamedExpression] = 
expr match {
--- End diff --

I think using Set here gave us better intuition that we actually want to 
count uniq fields, any reason why you want to keep the order? Especially the 
order is not defined very well, it will change easily if we modify the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106606071
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase {
   filterableSource: FilterableTableSource[_],
   description: String): Unit = {
 
-if (filterableSource.isFilterPushedDown) {
-  // The rule can get triggered again due to the transformed "scan => 
filter"
-  // sequence created by the earlier execution of this rule when we 
could not
-  // push all the conditions into the scan
-  return
-}
+Preconditions.checkArgument(!filterableSource.isFilterPushedDown)
 
 val program = calc.getProgram
+val functionCatalog = FunctionCatalog.withBuiltIns
 val (predicates, unconvertedRexNodes) =
   RexProgramExtractor.extractConjunctiveConditions(
 program,
 call.builder().getRexBuilder,
-tableSourceTable.tableEnv.getFunctionCatalog)
+functionCatalog)
 if (predicates.isEmpty) {
   // no condition can be translated to expression
   return
 }
 
-val (newTableSource, remainingPredicates) = 
filterableSource.applyPredicate(predicates)
-// trying to apply filter push down, set the flag to true no matter 
whether
-// we actually push any filters down.
-newTableSource.setFilterPushedDown(true)
+val remainingPredicates = new util.LinkedList[Expression]()
+predicates.foreach(e => remainingPredicates.add(e))
+
+val newTableSource = 
filterableSource.applyPredicate(remainingPredicates)
--- End diff --

For example the condition looks like  "a OR b", if the table source knows 
that `a` will always be `false`, but not sure with `b`. So it can actually 
modify the condition to `b` to reduce framework cost


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...

2017-03-17 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3520
  
Sure, so the result is there will be 2 commits to master repository?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3520
  
Hi @fhueske, thanks for the review. I addressed all your comments and will 
rebase to master to let travis check. Will merge this after build success.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106568641
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
 ---
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{CsvTableSource, TableSource}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase, 
TestFilterableTableSource}
+import org.junit.{Assert, Test}
+
+class TableSourceTest extends TableTestBase {
+
+  private val projectedFields: Array[String] = Array("last", "id", "score")
+  private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+  // batch plan
+
+  @Test
+  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+val (tableSource, tableName) = csvTable
+val util = batchTestUtil()
+val tEnv = util.tEnv
+
+tEnv.registerTableSource(tableName, tableSource)
+
+val result = tEnv
+  .scan(tableName)
+  .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  batchSourceTableNode(tableName, projectedFields),
+  term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 
2) AS _c2")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanPlanSQL(): Unit = {
+val (tableSource, tableName) = csvTable
+val util = batchTestUtil()
+
+util.tEnv.registerTableSource(tableName, tableSource)
+
+val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+val expected = unaryNode(
+  "DataSetCalc",
+  batchSourceTableNode(tableName, projectedFields),
+  term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS 
EXPR$2")
+)
+
+util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+val (tableSource, tableName) = csvTable
+val util = batchTestUtil()
+val tEnv = util.tEnv
+
+tEnv.registerTableSource(tableName, tableSource)
+
+val result = tEnv
+  .scan(tableName)
+  .select('id, 'score, 'first)
+
+val expected = batchSourceTableNode(tableName, noCalcFields)
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableWithoutPushDown(): Unit = {
+val (tableSource, tableName) = filterableTableSource
+val util = batchTestUtil()
+val tEnv = util.tEnv
+
+tEnv.registerTableSource(tableName, tableSource)
+
+val result = tEnv
+.scan(tableName)
+.select('price, 'id, 'amount)
+.where("price * 2 < 32")
+
+val expected = unaryNode(
+  "DataSetCalc",
+  batchSourceTableNode(
+tableName,
+Array("name", "id", "amount", "price")),
+  term("select", "price", "id", "amount"),
+  term("where", "<(*(price, 2), 32)")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterablePartialPushDown(): Unit = {
+val (tableSource, tableName) = filterableTableSource
+val util = batchTestUtil()
+val tEnv = util.tEnv
+
+tEnv.registerTableSource(tableName, tableSource)
+
  

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106566787
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala
 ---
@@ -16,106 +16,96 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.rules.util
+package org.apache.flink.table.plan.util
 
 import java.math.BigDecimal
+import java.util
 
 import org.apache.calcite.adapter.java.JavaTypeFactory
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
 import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
-import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, 
VARCHAR}
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, 
VARCHAR, BOOLEAN}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
-import org.junit.Assert.{assertArrayEquals, assertTrue}
-import org.junit.{Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
-/**
-  * This class is responsible for testing RexProgramProjectExtractor.
-  */
-class RexProgramProjectExtractorTest {
-  private var typeFactory: JavaTypeFactory = _
-  private var rexBuilder: RexBuilder = _
-  private var allFieldTypes: Seq[RelDataType] = _
-  private val allFieldNames = List("name", "id", "amount", "price")
-
-  @Before
-  def setUp(): Unit = {
-typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
-rexBuilder = new RexBuilder(typeFactory)
-allFieldTypes = List(VARCHAR, BIGINT, INTEGER, 
DOUBLE).map(typeFactory.createSqlType(_))
-  }
+abstract class RexProgramTestBase {
 
-  @Test
-  def testExtractRefInputFields(): Unit = {
-val usedFields = extractRefInputFields(buildRexProgram())
-assertArrayEquals(usedFields, Array(2, 3, 1))
-  }
+  val typeFactory: JavaTypeFactory = new 
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+
+  val allFieldNames: util.List[String] = List("name", "id", "amount", 
"price", "flag").asJava
+
+  val allFieldTypes: util.List[RelDataType] =
+List(VARCHAR, BIGINT, INTEGER, DOUBLE, 
BOOLEAN).map(typeFactory.createSqlType).asJava
+
+  var rexBuilder: RexBuilder = new RexBuilder(typeFactory)
 
-  @Test
-  def testRewriteRexProgram(): Unit = {
-val originRexProgram = buildRexProgram()
-assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
-  "$0",
-  "$1",
-  "$2",
-  "$3",
-  "*($t2, $t3)",
-  "100",
-  "<($t4, $t5)",
-  "6",
-  ">($t1, $t7)",
-  "AND($t6, $t8)")))
-// use amount, id, price fields to create a new RexProgram
-val usedFields = Array(2, 3, 1)
-val types = usedFields.map(allFieldTypes(_)).toList.asJava
-val names = usedFields.map(allFieldNames(_)).toList.asJava
-val inputRowType = typeFactory.createStructType(types, names)
-val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, 
usedFields, rexBuilder)
-assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
-  "$0",
-  "$1",
-  "$2",
-  "*($t0, $t1)",
-  "100",
-  "<($t3, $t4)",
-  "6",
-  ">($t2, $t6)",
-  "AND($t5, $t7)")))
+  /**
+* extract all expression string list from input RexProgram expression 
lists
+*
+* @param rexProgram input RexProgram instance to analyze
+* @return all expression string list of input RexProgram expression 
lists
+*/
+  protected def extractExprStrList(rexProgram: RexProgram): 
mutable.Buffer[String] = {
+rexProgram.getExprList.asScala.map(_.toString)
   }
 
-  private def buildRexProgram(): RexProgram = {
-val types = allFieldTypes.asJava
-val names = allFieldNames.asJava
-val inputRowType = typeFactory.createStructType(types, names)
+  // select amount, amount * price as total where amount * price < 100 and 
id > 6
+  protected def buildSimpleRexProgram1(): RexProgram = {
+val inputRowType = typeFactory.createStructType(allFieldTypes, 
allFieldNames)
 val builder = new RexProgramBuilder(inputRowType, rexBuilder)
-val t0 = rexBuilder.makeInputRef(types.get(2), 2)
 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106566599
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder}
+import org.apache.calcite.sql.SqlPostfixOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.junit.Assert.{assertArrayEquals, assertEquals}
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class RexProgramExtractorTest extends RexProgramTestBase {
+
+  private val functionCatalog: FunctionCatalog = 
FunctionCatalog.withBuiltIns
+
+  @Test
+  def testExtractRefInputFields(): Unit = {
+val usedFields = 
RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1())
+assertArrayEquals(usedFields, Array(2, 3, 1))
+  }
+
+  @Test
+  def testExtractSimpleCondition(): Unit = {
+val builder: RexBuilder = new RexBuilder(typeFactory)
+val program = buildSimpleRexProgram2()
+
+val firstExp = ExpressionParser.parseExpression("id > 6")
+val secondExp = ExpressionParser.parseExpression("amount * price < 
100")
+val expected: Array[Expression] = Array(firstExp, secondExp)
+
+val (convertedExpressions, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+builder,
+functionCatalog)
+
+assertExpressionArrayEquals(expected, convertedExpressions)
+assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
+  def testExtractSingleCondition(): Unit = {
+val inputRowType = typeFactory.createStructType(allFieldTypes, 
allFieldNames)
+val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+// amount
+val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+// id
+val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+
+// a = amount >= id
+val a = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
t0, t1))
+builder.addCondition(a)
+
+val program = builder.getProgram
+val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+val (convertedExpressions, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+relBuilder,
+functionCatalog)
+
+val expected: Array[Expression] = 
Array(ExpressionParser.parseExpression("amount >= id"))
+assertExpressionArrayEquals(expected, convertedExpressions)
+assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d)
+  @Test
+  def testExtractCnfCondition(): Unit = {
+val inputRowType = typeFactory.createStructType(allFieldTypes, 
allFieldNames)
+val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+// amount
+val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+// id
+val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+// price
+val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
+// 100
+val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+// a = amount < 100
+val a = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3))
+// b = id > 100
+val b = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t3))
+// c = price == 100
+val 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106566606
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
 ---
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder}
+import org.apache.calcite.sql.SqlPostfixOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.junit.Assert.{assertArrayEquals, assertEquals}
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class RexProgramExtractorTest extends RexProgramTestBase {
+
+  private val functionCatalog: FunctionCatalog = 
FunctionCatalog.withBuiltIns
+
+  @Test
+  def testExtractRefInputFields(): Unit = {
+val usedFields = 
RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1())
+assertArrayEquals(usedFields, Array(2, 3, 1))
+  }
+
+  @Test
+  def testExtractSimpleCondition(): Unit = {
+val builder: RexBuilder = new RexBuilder(typeFactory)
+val program = buildSimpleRexProgram2()
+
+val firstExp = ExpressionParser.parseExpression("id > 6")
+val secondExp = ExpressionParser.parseExpression("amount * price < 
100")
+val expected: Array[Expression] = Array(firstExp, secondExp)
+
+val (convertedExpressions, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+builder,
+functionCatalog)
+
+assertExpressionArrayEquals(expected, convertedExpressions)
+assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
+  def testExtractSingleCondition(): Unit = {
+val inputRowType = typeFactory.createStructType(allFieldTypes, 
allFieldNames)
+val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+// amount
+val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+// id
+val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+
+// a = amount >= id
+val a = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
t0, t1))
+builder.addCondition(a)
+
+val program = builder.getProgram
+val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+val (convertedExpressions, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+relBuilder,
+functionCatalog)
+
+val expected: Array[Expression] = 
Array(ExpressionParser.parseExpression("amount >= id"))
+assertExpressionArrayEquals(expected, convertedExpressions)
+assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d)
+  @Test
+  def testExtractCnfCondition(): Unit = {
+val inputRowType = typeFactory.createStructType(allFieldTypes, 
allFieldNames)
+val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+// amount
+val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+// id
+val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+// price
+val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
+// 100
+val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+// a = amount < 100
+val a = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3))
+// b = id > 100
+val b = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t3))
+// c = price == 100
+val 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106566042
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.streaming.api.datastream.DataStream
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.sources.{BatchTableSource, 
FilterableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.tools.nsc.interpreter.JList
+
+/**
+  * This source can only handle simple comparision with field "amount".
+  * Supports ">, <, >=, <=, =, <>" with an integer.
+  */
+class TestFilterableTableSource(
+val recordNum: Int = 33)
+extends BatchTableSource[Row]
+with StreamTableSource[Row]
+with FilterableTableSource[Row] {
+
+  var filterPushedDown: Boolean = false
+
+  val fieldNames: Array[String] = Array("name", "id", "amount", "price")
+
+  val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, 
DOUBLE)
+
+  // all predicates with filed "amount"
+  private var filterPredicates = new mutable.ArrayBuffer[Expression]
+
+  // all comparing values for field "amount"
+  private val filterValues = new mutable.ArrayBuffer[Int]
+
+  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+execEnv.fromCollection[Row](generateDynamicCollection().asJava, 
getReturnType)
+  }
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+execEnv.fromCollection[Row](generateDynamicCollection().asJava, 
getReturnType)
+  }
+
+  override def explainSource(): String = {
+if (filterPredicates.nonEmpty) {
+  s"filter=[${filterPredicates.reduce((l, r) => And(l, r)).toString}]"
+} else {
+  ""
+}
+  }
+
+  override def getReturnType: TypeInformation[Row] = new 
RowTypeInfo(fieldTypes, fieldNames)
+
+  override def applyPredicate(predicates: JList[Expression]): 
TableSource[Row] = {
+val newSource = new TestFilterableTableSource(recordNum)
+newSource.filterPushedDown = true
+
+val iterator = predicates.iterator()
+while (iterator.hasNext) {
+  iterator.next() match {
+case expr: BinaryComparison =>
+  (expr.left, expr.right) match {
+case (f: ResolvedFieldReference, v: Literal) if 
f.name.equals("amount") =>
+  newSource.filterPredicates += expr
+  newSource.filterValues += 
v.value.asInstanceOf[Number].intValue()
+  iterator.remove()
+case (_, _) =>
+  }
+  }
+}
+
+newSource
+  }
+
+  override def isFilterPushedDown: Boolean = filterPushedDown
+
+  private def generateDynamicCollection(): Seq[Row] = {
+Preconditions.checkArgument(filterPredicates.length == 
filterValues.length)
+
+for {
+  cnt <- 0 until recordNum
+  if shouldCreateRow(cnt)
+} yield {
+  val row = new Row(fieldNames.length)
--- End diff --

Thanks for the tips!


---
If your project is set up for it, you c

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106565668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase {
   filterableSource: FilterableTableSource[_],
   description: String): Unit = {
 
-if (filterableSource.isFilterPushedDown) {
-  // The rule can get triggered again due to the transformed "scan => 
filter"
-  // sequence created by the earlier execution of this rule when we 
could not
-  // push all the conditions into the scan
-  return
-}
+Preconditions.checkArgument(!filterableSource.isFilterPushedDown)
 
 val program = calc.getProgram
+val functionCatalog = FunctionCatalog.withBuiltIns
 val (predicates, unconvertedRexNodes) =
   RexProgramExtractor.extractConjunctiveConditions(
 program,
 call.builder().getRexBuilder,
-tableSourceTable.tableEnv.getFunctionCatalog)
+functionCatalog)
 if (predicates.isEmpty) {
   // no condition can be translated to expression
   return
 }
 
-val (newTableSource, remainingPredicates) = 
filterableSource.applyPredicate(predicates)
-// trying to apply filter push down, set the flag to true no matter 
whether
-// we actually push any filters down.
-newTableSource.setFilterPushedDown(true)
+val remainingPredicates = new util.LinkedList[Expression]()
+predicates.foreach(e => remainingPredicates.add(e))
+
+val newTableSource = 
filterableSource.applyPredicate(remainingPredicates)
--- End diff --

I think we don't have to restrict this. If the user for some reason indeed 
want to change the predicates which returns back and  executed by framework, we 
should allow them to do so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106565688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -20,47 +20,40 @@ package org.apache.flink.table.sources
 
 import org.apache.flink.table.expressions.Expression
 
+import scala.tools.nsc.interpreter.JList
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...

2017-03-16 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3520
  
Hi @fhueske, you just pointed out a question i had when tonycox 
implementing the first version. Why we are preventing `FilterableTableSource` 
from modifying the expressions? I think it's totally up to them whether 
changing the expressions they took or keep them just they were. We have not 
reason to restrict the  `FilterableTableSource`'s behavior if they just do the 
things right.

Pass in a java list and told user who extending this to pick and remove the 
expressions the support is not super nice. But even if user just pick 
expressions but not remove them, we still get the correct answer.
 
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3520
  
Hi @fhueske @godfreyhe, thanks for the review, i addressed most all your 
comments.

@fhueske Except for letting `TableSourceScan` be aware of whether filter 
has been pushed down. I'm not sure to let the `TableSourceScan` has this kind 
of information, i'd prefer to let them stay within the all kinds of 
`TableSource`. One drawback to let `TableSourceScan` has such kind of 
information is when we do the `TableSourceScan` copy, we need to take care all 
these information, make sure they also be copied correctly. In the future, if 
we add more extension to `TableSource` like we can push part of query down, we 
will face this problem. 

Regarding to the interface of `FilterableTableSource`, i agree with you 
that the trait containing some logic is not friendly with java extensions. So i 
removed the default implementation of `isFilterPushedDown`, the inherited class 
should take care of this method. And regarding the `Tuple2` thing, how about we 
pass in a mutable java list, and let table source to *pick out* expression from 
it and return a copy of table source which contains these pushed down 
predicates. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335718
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.expressions.Expression
+
+/**
+  * Adds support for filtering push-down to a [[TableSource]].
+  * A [[TableSource]] extending this interface is able to filter records 
before returning.
+  */
+trait FilterableTableSource[T] extends TableSource[T] {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335709
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+call.operands.foreach(operand => operand.accept(this))
  

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+call.operands.foreach(operand => operand.accept(this))
  

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushFilterIntoBatchTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource[_] =>
+calc.getProgram.getCondition != null
--- End diff --

Will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329225
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, 
StreamTableSourceScan}
+import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+operand(classOf[StreamTableSourceScan], none)),
+  "PushFilterIntoStreamTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource[_] =>
+calc.getProgram.getCondition != null
--- End diff --

Will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106328751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
+val tableEnv: TableEnvironment,
--- End diff --

Yes, you are right, especially that UDF is currently registered as objects 
but not classes, it's really impossible to let TableSource supporting this. I 
will remove this filed and only use built-in functions when extracting 
expression form RexProgram.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+call.operands.foreach(operand => operand.accept(this))
  

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329079
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableSourceScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource[_],
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
--- End diff --

Will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106327707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableSourceScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource[_],
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
+  // The rule can get triggered again due to the transformed "scan => 
filter"
+  // sequence created by the earlier execution of this rule when we 
could not
+  // push all the conditions into the scan
+  return
+}
+
+val program = calc.getProgram
+val (predicates, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+call.builder().getRexBuilder,
+tableSourceTable.tableEnv.getFunctionCatalog)
+if (predicates.isEmpty) {
+  // no condition can be translated to expression
+  return
+}
+
+val (newTableSource, remainingPredicates) = 
filterableSource.applyPredicate(predicates)
+// trying to apply filter push down, set the flag to true no matter 
whether
+// we actually push any filters down.
+newTableSource.setFilterPushedDown(true)
+
+// check whether framework still need to do a filter
+val relBuilder = call.builder()
+val remainingCondition = {
+  if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) {
+relBuilder.push(scan)
+(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ 
unconvertedRexNodes)
+.reduce((l, r) => relBuilder.and(l, r))
+  } else {
+null
+  }
+}
+
+// check whether we still need a RexProgram. An RexProgram is needed 
when either
+// projection or filter exists.
+val newScan = scan.copy(scan.getTraitSet, newTableSource)
+val newRexProgram = {
+  if (remainingCondition != null || program.getProjectList.size() > 0) 
{
--- End diff --

Thanks for the tips, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106324883
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -58,16 +60,24 @@ class BatchTableSourceScan(
 )
   }
 
+  override def copy(traitSet: RelTraitSet, newTableSource: 
TableSource[_]): TableSourceScan = {
+new BatchTableSourceScan(
+  cluster,
+  traitSet,
+  getTable,
+  newTableSource.asInstanceOf[BatchTableSource[_]]
+)
+  }
+
   override def explainTerms(pw: RelWriter): RelWriter = {
-super.explainTerms(pw)
+val terms = super.explainTerms(pw)
   .item("fields", 
TableEnvironment.getFieldNames(tableSource).mkString(", "))
+tableSource.explainTerms(terms)
--- End diff --

will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106324868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -39,4 +39,6 @@ trait TableSource[T] {
   /** Returns the [[TypeInformation]] for the return type of the 
[[TableSource]]. */
   def getReturnType: TypeInformation[T]
 
+  /** Describes the table source */
+  def explainTerms(pw: RelWriter): RelWriter = pw
--- End diff --

Make sense to me, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106322568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -42,63 +41,40 @@ class DataSetCalc(
 traitSet: RelTraitSet,
 input: RelNode,
 rowRelDataType: RelDataType,
-private[flink] val calcProgram: RexProgram, // for tests
+calcProgram: RexProgram,
 ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
--- End diff --

This is because i want to unify the PushFilterIntoScan rule's code for both 
batch and stream mode. During executing the rule, we may need to create a new 
copy of the DataSetCalc or DataStreamCalc. It make things more easier to let 
these two classes inherit from `Calc`, and use `Calc.copy` to create a new 
copied instance. 

I do encountered some problem after i changed the hierarchy, some unit 
tests failed because of the plan changed. But it's because we don't calculate 
the cost for Calc right. I added some logic to `CommanCalc.computeSelfCost`, 
and everything works fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106321885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
 ---
@@ -24,7 +24,7 @@ package org.apache.flink.table.sources
   *
   * @tparam T The return type of the [[ProjectableTableSource]].
   */
-trait ProjectableTableSource[T] {
+trait ProjectableTableSource[T] extends TableSource[T] {
--- End diff --

It's because i want to unify the PushProjectIntoScan rule codes for both 
batch and stream mode. And once we push down project into table source, we not 
only should create a new TableScan instance, but also a new TableSource 
instance. The codes are like:
```
val newTableSource = originTableSource.projectFields(usedFields)
// create a new scan with the new TableSource instance
val newScan = scan.copy(scan.getTraitSet, newTableSource) 
```
At first the `projectFields` method returned `ProjectableTableSource` which 
is not a `TableSource`, so i let `ProjectableTableSource` inherit from 
`TableSource`. But i just noticed we can just let `projectFields` return one 
`TableSource`, and problem resolved.

Will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3530: [FLINK-6040] [table] DataStreamUserDefinedFunctionITCase ...

2017-03-14 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3530
  
looks good, merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3530: [FLINK-6040] [table] DataStreamUserDefinedFunction...

2017-03-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3530#discussion_r105869656
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 ---
@@ -85,6 +85,8 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
   def testUserDefinedTableFunctionWithParameter(): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
--- End diff --

move these to case set up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...

2017-03-14 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3520
  
To address the problem of not reusing `TableSource` when we create a new 
scan for table source, i changed some inheritance for current `BatchScan`, 
`StreamScan`, `BatchTableSourceScan`, `StreamTableSourceScan` and so on. The 
new structure is moe likely as the relationship between `FlinkTable` and 
`TableSourceTable`, `DataSetTable`, `DataStreamTable`. 

After changing the structure, it also make it possible to unify the push 
project into scan rule for both batch and stream mode. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-13 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3520

[FLINK-3849] [table] Add FilterableTableSource interface and rules for 
pushing it

This PR is based on #3166 , and added following changes:

1. Refactor `RexProgramExpressionExtractor` and 
`RexProgramExpressionExtractor` to `RexProgramExtractor` and 
`RexProgramRewriter`. `RexProgramExtractor` is responsible for retract either 
projection expressions or filter expression.
2. Make sure we don't fail during extracting and converting filter RexNodes 
to expressions. The expressions which successfully translated and unconverted 
RexNodes will both be returned.
3. Add some tests for `RexProgramExtractor`.
4. Provide unified `PushFilterIntoTableSourceScanRuleBase` to support 
filter push down in both batch and stream mode.
5. Add some logical tests for filter push down in different situations, 
like fully push down and partial push down.
5. Slight change of testing class `TestFilterableTableSource` to make it 
less specialized.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-3849

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3520.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3520


commit 0a7af41509d9a0db3e791cb9f4dc5a1a8086f0b2
Author: tonycox <anton_solo...@epam.com>
Date:   2017-01-11T09:15:49Z

[FLINK-3849] Add FilterableTableSource interface and Rules for pushing it

commit 549b4e00e68d32f070e196fc6eb9a7f5f9e937c3
Author: tonycox <anton_solo...@epam.com>
Date:   2017-01-31T12:41:52Z

fix filterable test

commit 9aa82062832e0aabcb003e582c8130aeecc91a73
Author: tonycox <anton_solo...@epam.com>
Date:   2017-02-16T13:32:33Z

rebase and trying fix rexnode parsing

commit 646a6931224c7dcc58501ec014ab675925bb105d
Author: tonycox <anton_solo...@epam.com>
Date:   2017-02-17T16:48:40Z

create wrapper and update rules

commit abfa38d894aa86b7a5c91dd29bf398b880c8bfe7
Author: Kurt Young <ykt...@gmail.com>
Date:   2017-03-13T07:30:13Z

[FLINK-3849] [table] Add FilterableTableSource interface and rules for 
pushing it




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3510: [FLINK-6023] Fix Scala snippet into Process Function Doc

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3510
  
Thanks @maocorte for the quick fix, looks good to merge now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105422108
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction<Tuple2<String,
 
 
 {% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.RichProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
 
 // the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

Sorry i didn't make myself clear. What IDS complains is the variable name 
`key` is conflicts with the following lines: 
```case CountWithTimestamp(key, count, _) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
```
It's not clear whether you want to use the `key` you just defined or the 
`key` in the match pattern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105419777
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction<Tuple2<String,
 
 
 {% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.RichProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
 
 // the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
--- End diff --

The class name should change to CountWithTimeoutFunction to be consistency 
with java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420103
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction<Tuple2<String,
 
 
 {% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.RichProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
 
 // the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
--- End diff --

I think it should be  value: (String, String)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105420011
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction<Tuple2<String,
 
 
 {% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.RichProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
 
 // the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
--- End diff --

And the first type for ProcessFunction should be (String, String)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105418830
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction<Tuple2<String,
 
 
 {% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.RichProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
 
 // the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
-.keyBy(0)
-.process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and 
timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), 
(String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and 
timeouts
+  */
+class TimeoutStateFunction extends RichProcessFunction[(String, Long), 
(String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-  .getState(new ValueStateDescriptor<>("myState", 
clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", 
classOf[CountWithTimestamp]))
 
 
   override def processElement(value: (String, Long), ctx: Context, out: 
Collector[(String, Long)]): Unit = {
 // initialize or retrieve/update the state
+val (key, _) = value
--- End diff --

can we change the name here? since the IDE reports `suspicious shadowing  
by a variable pattern`. Or you can just use value._1 instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3510#discussion_r105417009
  
--- Diff: docs/dev/stream/process_function.md ---
@@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends 
RichProcessFunction<Tuple2<String,
 
 
 {% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import 
org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.RichProcessFunction
--- End diff --

RichProcessFunction no longer exists, can you change it back to 
ProcessFunction? And there are some `RichProcessFunction`s in the java snippet, 
can you try to fix that too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3488: [FLINK-5971] [flip-6] Add timeout for registered jobs on ...

2017-03-10 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3488
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-09 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3406
  
Hi @fhueske , i like your propose about moving the annotation from 
`TableSource` to `TableSourceConverter`. Lets do it this way. 
BTW, i noticed that you offered three possible methods to the 
`TableSourceConverter`, i can only imagine `def requiredProperties: 
Array[String]
` is necessary for now. It can help validating the converter and to decide 
which converter we should use when multiple converters have the same 
`TableType`. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >