[GitHub] flink pull request #4940: [FLINK-7959] [table] Split CodeGenerator into Code...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/4940 [FLINK-7959] [table] Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator. ## What is the purpose of the change Split current `CodeGenerator` into two dedicated classes, `CodeGeneratorContext` and `ExprCodeGenerator`. The `CodeGeneratorContext` is responsible for maintaining various reusable statements that could be insert into the final generated class, while the `ExprCodeGenerator` is responsible for generating codes for `RexNode` and generating result convertion codes. ## Brief change log - *Remove `CodeGenerator` and introduce `ExprCodeGenerator` and `CodeGeneratorContext`* - *Make some code generator static* ## Verifying this change *(Please pick either of the following options)* This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-7959 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4940.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4940 commit 77b2d0ac00671347b0952719d70e2eb8230cd8d4 Author: Kurt Young Date: 2017-11-03T07:37:22Z [FLINK-7959] [table] Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator. ---
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3511 > So I think a simpler and better approach is to just make sure that most types have a good implementation of putNormalizedKey, and then NormalizedKeySorter.compareRecords would be called only rarely, so its performance wouldn't really matter. You are right when the sort keys are simple numeric types, but not with strings, which maybe the most popular choice in some ETL and data warehouse pipelines. But i agree that code generation can't help with this situation, so we investigate some binary data formats to represent our record and modify the interface of TypeSerializer & TypeComparator when doing ser/de. We don't have to consume the input/output view byte by byte, but has the ability to random access the underlying data, aka MemorySegment. It acts like spark's UnsafeRow: https://reviewable.io/reviews/apache/spark/5725, so we can eliminate the most deserialization cost such as `read byte[]` and then `new String(byte[])`. We combine this approach with some code generation to eliminate the virtual function call of the TypeComparator and see a 10x performance improvements with sorting on strings. > I think a large potential in code-generation is to eliminate the overheads of the very many virtual function calls throughout the runtime Totally agreed, after we finish dealing with the code generation and improving the ser/de, we will investigate more about this. Good to see that you have a list of all the megamorphic calls. BTW, we are actually translating the batch jobs into the streaming runtime, i think there will be lots in common. Having and control more type informations, and code generation the whole operator have lots of benefits, it can also help to making most of the calls monomorphic, such as: - fully control of the object reusing, yes - comparators - generating hash codes - potential improvements of some algorithm which finds out they only need to deal with fixed length data - Directly using primitive variables when dealing with simple type And you are right this is orthogonal with runtime improvements, and we see the boundary is the Operator. The framework should provide the most efficient environment for operators to run, and we will code generating the most efficient operators to live in it. > Btw. have you seen this PR for code generation for POJO serializers and comparators? #2211 I didn't see it yet, will find some time to check it out. ---
[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3511 @heytitle @ggevay Great work! We are working on a project to fully code generate the algorithm Flink runtime used, and borrowed lots of idea of this PR, thanks! IMHO, in addition to these changes, there are still some potential improvements we can do about the sorter, like deserialization when comparing the real records. To achieve this, we need more type information control and flexible code generating supports, so we choose to do it through the Table API & SQL. How do you see this approach? ---
[GitHub] flink issue #4667: [FLINK-5859] [table] Add PartitionableTableSource for par...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4667 LGTM, @fhueske @twalthr can you also take a look? ---
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049572 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. --- End diff -- We should make this flag more clear. If you mean this flag represents whether the partition pruning is applied, i would say it should always be true, because when this method been called, at least framework had tried to apply the partition pruning. ---
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049316 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. --- End diff -- Looks like the definition of "prunedPartitions" is contrary here. I think we should stick to only one definition, either "prunedPartitions" represents all partitions which have been pruned, or all remaining partitions which survive after pruning. ---
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048604 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. +* Notes: If partition pruning is not applied, prunedPartitions is empty. +* @param predicates A list contains conjunctive predicates, you should pick and remove all +* expressions that can be pushed down. The remaining elements of this +* list will further evaluated by framework. +* @return A new cloned instance of [[TableSource]]. +*/ + def applyPrunedPartitionsAndPredicate( +partitionPruned: Boolean, +prunedPartitions: JList[Partition], +predicates: JList[Expression]): TableSource[T] + +
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048337 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -55,4 +55,9 @@ trait FilterableTableSource[T] { */ def isFilterPushedDown: Boolean + /** +* @param relBuilder Builder for relational expressions. +*/ + def setRelBuilder(relBuilder: RelBuilder): Unit --- End diff -- Can you move this method to PartitionableTableSource? ---
[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4668 @fhueske sure ---
[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4668 Thanks @JingsongLi for your contribution LGTM, +1 to merge cc @fhueske @StephanEwen ---
[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4529 @zentol So the rationale behind this is when all netty's memory comes from flink's managed memory, then this is not an issue, right? ---
[GitHub] flink issue #4529: [FLINK-7428][network] avoid buffer copies when receiving ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4529 If I remember correctly, the reason we chose to do an extra copy inside `extractFrame` is that we faced some memory problem when using `slice`. Is this no longer an issue or have taken care by another PR? ---
[GitHub] flink issue #4445: [FLINK-7310][core] always use the HybridMemorySegment
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4445 I would bet on deserialization for it. And why sorter suffers more regression than hash join is that sorter will cause more deserializations during compare records than hash join. Despite the regression we will face, i think it's still worthy since we can avoid an extra copy from network to runtime. It's better if we can take the extra copy into account during benchmark, but it's ok we don't have it. +1 to merge this. ---
[GitHub] flink pull request #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and D...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4305#discussion_r127113118 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java --- @@ -843,11 +843,14 @@ private float convertToFloat(Object o, float defaultValue) { } else if (o.getClass() == Double.class) { double value = ((Double) o); - if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) { - return (float) value; - } else { + if (value < -Float.MAX_VALUE --- End diff -- I'm ok with both, will write it in your way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and D...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4305#discussion_r127107209 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java --- @@ -843,11 +843,14 @@ private float convertToFloat(Object o, float defaultValue) { } else if (o.getClass() == Double.class) { double value = ((Double) o); - if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) { - return (float) value; - } else { + if (value < -Float.MAX_VALUE --- End diff -- I'm afraid in that case, 0 can not be treated right, so i add a test case to check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and Double.M...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4305 Hi @greghogan , thanks for the review, your comments have been addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and D...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/4305 [FLINK-7161] fix misusage of Float.MIN_VALUE and Double.MIN_VALUE I already checked all places using Float.MIN_VALUE and Double.MIN_VALUE and fixed all misusage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-7161 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4305.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4305 commit 5c7a2fd4c97eaedba87b0042e337d94f315b4191 Author: Kurt Young Date: 2017-07-12T06:02:58Z [FLINK-7161] fix misusage of Float.MIN_VALUE and Double.MIN_VALUE --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r117385333 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats * @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( -identifier: TableIdentifier, --- End diff -- I agree not all the "tables" have a name, sounds good to me then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r117201965 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats * @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( -identifier: TableIdentifier, --- End diff -- We have tested this with our own catalog integration, and keep the table name should work. The sub catalog which representing the database layer can concat the db and table name when creating Table instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116919339 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -128,7 +128,7 @@ case class DatabaseNotExistException( * @param db database name * @param cause the cause */ -case class DatabaseAlreadyExistException( +case class CatalogAlreadyExistException( --- End diff -- Same above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116919313 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -114,7 +114,7 @@ case class TableAlreadyExistException( * @param db database name * @param cause the cause */ -case class DatabaseNotExistException( +case class CatalogNotExistException( --- End diff -- Please update the comment and parameter name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3913: [FLINK-6574] Support nested catalogs in ExternalCa...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3913#discussion_r116919169 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -37,25 +36,10 @@ import org.apache.flink.table.plan.stats.TableStats * @param lastAccessTime Timestamp of last access of the table */ case class ExternalCatalogTable( -identifier: TableIdentifier, --- End diff -- If we don't have table identifier or table name, the `TableSourceConverter` will not work when trying to convert `ExternalCatalogTable` to a real `TableSource`. Even if we keep the table name, i'm not sure it will work for all situations. For example, if we have an outside catalog which have the notion of database, like MySQL. Table in that catalog will named as "db1.table1" or "db2.table2". In the new design, normally we should add database as the sub catalog of the root catalog. So we can create "table1" from sub-catalog named "db1". The problem is the table name is assigned as "table1", we loose the information that this table actually comes from "db1". This may cause some problems when the TableSource trying to establish connection or trying to get table information from outside catalog. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3803: [FLINK-6394] [runtime] Respect object reuse configuration...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3803 Ok, do it right way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3803: [FLINK-6394] [runtime] Respect object reuse configuration...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3803 Sorry i wasn't available in the last couple days, it seems the 1.3 branch has been created, should i merge this PR in to 1.3 branch too? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3803: [FLINK-6394] [runtime] Respect object reuse config...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3803 [FLINK-6394] [runtime] Respect object reuse configuration when execut⦠â¦ing group combining function You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-6394 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3803.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3803 commit 3f81f60603940ec2b26e81814224b69ae40afcc0 Author: Kurt Young Date: 2017-04-30T08:56:00Z [FLINK-6394] [runtime] Respect object reuse configuration when executing group combining function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3394: [FLINK-5810] [flip6] Introduce a hardened slot man...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3394#discussion_r114070398 --- Diff: flink-runtime/src/test/resources/log4j-test.properties --- @@ -16,7 +16,7 @@ # limitations under the License. -log4j.rootLogger=OFF, console +log4j.rootLogger=DEBUG, console --- End diff -- Is this change intended or accidental? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3794: [FLINK-6398] RowSerializer's duplicate should alwa...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3794 [FLINK-6398] RowSerializer's duplicate should always return a new instance You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-6398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3794.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3794 commit 29f328493dce3adef828f3a93be171a57f1316b0 Author: Kurt Young Date: 2017-04-27T15:37:21Z [FLINK-6398] RowSerializer's duplicate should always return a new instance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 Rebased to master and will merge after build check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 @fhueske I have rename the package and update the union description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111931445 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala --- @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase { val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f) -val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table") +val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " + +s"UNION ALL SELECT a, b, c FROM $table") --- End diff -- +1, will change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111930527 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical --- End diff -- OK, will rename the packages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3594: [FLINK-6149] [table] Add additional flink logical relatio...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 Hi @fhueske , sorry for taking it so long to update this PR. Looks like it has so many conflicts with master now, once you are ok with all the changes, i will rebase it to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111861591 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala --- @@ -18,58 +18,57 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} import org.apache.calcite.rex.RexNode -import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate} +import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate +import org.apache.flink.table.rel.FlinkConventions +import org.apache.flink.table.rel.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan} /** * Rule to convert a LogicalCorrelate into a DataSetCorrelate. --- End diff -- I will remove this simple comment to be consistent will other convert rules. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111861037 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala --- @@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort} import java.lang.Double +import org.apache.flink.table.rel.logical.FlinkLogicalCalc + object FlinkRelMdRowCount extends RelMdRowCount { -val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource( - BuiltInMethod.ROW_COUNT.method, - this) + val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource( +BuiltInMethod.ROW_COUNT.method, +this) + + def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq) --- End diff -- I'm ok with both, will change to `Calc`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111860657 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala --- @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase { val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f) -val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table") +val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " + +s"UNION ALL SELECT a, b, c FROM $table") --- End diff -- Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i understand correctly, `UNION` will do a global distinct for all fields and `UNION ALL` just concat two datasets or datastreams. I think the behavior of `DataStream.union` is rather `UNION ALL` than `UNION`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111860136 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.rel.FlinkConventions +import org.apache.flink.table.sources.TableSource + +import scala.collection.JavaConverters._ + +class FlinkLogicalTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +table: RelOptTable, +val tableSource: TableSource[_]) + extends TableScan(cluster, traitSet, table) + with FlinkLogicalRel { + + def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): FlinkLogicalTableSourceScan = { +new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + } + + override def deriveRowType(): RelDataType = { +val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] +flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(this) +planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + } + + override def explainTerms(pw: RelWriter): RelWriter = { --- End diff -- We need to rewrite `explainTerms` and `toString` to make the RelNode's digest distinguished from other `TableSourceScan`. The default implementation only compare the table name and it's not enough when we push filter or projects into the `TableSource`. Actually there is a bug in `toString` and fixed in https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a, i will backport it in here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111859756 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter} +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalSort( +cluster: RelOptCluster, +traits: RelTraitSet, +child: RelNode, +collation: RelCollation, +offset: RexNode, +fetch: RexNode) + extends Sort(cluster, traits, child, collation, offset, fetch) + with FlinkLogicalRel { + + private val limitStart: Long = if (offset != null) { +RexLiteral.intValue(offset) + } else { +0L + } + + private val limitEnd: Long = if (fetch != null) { +RexLiteral.intValue(fetch) + limitStart + } else { +Long.MaxValue + } + + val getOffset: RexNode = offset + + val getFetch: RexNode = fetch + + override def copy( + traitSet: RelTraitSet, + newInput: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + +new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + } + + override def estimateRowCount(metadata: RelMetadataQuery): Double = { +val inputRowCnt = metadata.getRowCount(this.getInput) +if (inputRowCnt == null) { + inputRowCnt +} else { + val rowCount = (inputRowCnt - limitStart).max(1.0) + if (fetch != null) { +val limit = RexLiteral.intValue(fetch) +rowCount.min(limit) + } else { +rowCount + } +} + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +// by default, assume cost is proportional to number of rows +val rowCount: Double = mq.getRowCount(this) +planner.getCostFactory.makeCost(rowCount, rowCount, 0) + } + + override def explainTerms(pw: RelWriter) : RelWriter = { --- End diff -- yes, these can be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111858923 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.{Aggregate, AggregateCall} +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.rel.FlinkConventions + +class FlinkLogicalAggregate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +indicator: Boolean, +groupSet: ImmutableBitSet, +groupSets: JList[ImmutableBitSet], +aggCalls: JList[AggregateCall]) + extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: JList[ImmutableBitSet], + aggCalls: JList[AggregateCall]): Aggregate = { +new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = metadata.getRowCount(child) +val rowSize = this.estimateRowSize(child.getRowType) +val aggCnt = this.aggCalls.size +planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + } +} + +private class FlinkLogicalAggregateConverter + extends ConverterRule( +classOf[LogicalAggregate], +Convention.NONE, +FlinkConventions.LOGICAL, +"FlinkLogicalAggregateConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalAggregate] +!agg.containsDistinctCall() --- End diff -- This is similar with `FlinkLogicalJoin`, we need other logical rules to rewrite distinct aggregates first and then convert it to a "clean" FlinkLogicalAggregate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111857813 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical --- End diff -- How about rename `org.apache.flink.table.plan.node` to `org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` and `datastream`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111857715 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core._ +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalJoin( +cluster: RelOptCluster, +traitSet: RelTraitSet, +left: RelNode, +right: RelNode, +condition: RexNode, +joinType: JoinRelType) + extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { + +new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val leftRowCnt = metadata.getRowCount(getLeft) +val leftRowSize = estimateRowSize(getLeft.getRowType) + +val rightRowCnt = metadata.getRowCount(getRight) +val rightRowSize = estimateRowSize(getRight.getRowType) + +val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) +val cpuCost = leftRowCnt + rightRowCnt +val rowCnt = leftRowCnt + rightRowCnt + +planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + } +} + +private class FlinkLogicalJoinConverter + extends ConverterRule( +classOf[LogicalJoin], +Convention.NONE, +FlinkConventions.LOGICAL, +"FlinkLogicalJoinConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin] +val joinInfo = join.analyzeCondition + +hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join) --- End diff -- Currently the answer is yes, because the physical join can only support equation condition with simple keys. If the join key is express like `a + 1 = b - 2`, if we don't have this restriction in logical layer, the join condition will be like this and we can't translate to physical join. There are 2 possible solutions: 1. keep it this way 2. Re-introduce rules which can exact expression out of join condition and add addition "Calc" node to keep join key as simple field. I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to let logical layer know all these restrictions. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3594: [FLINK-6149] [table] Add additional flink logical ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111856357 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -228,15 +229,24 @@ abstract class BatchTableEnvironment( } // 3. optimize the logical Flink plan -val optRuleSet = getOptRuleSet -val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() -val optimizedPlan = if (optRuleSet.iterator().hasNext) { - runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps) +val logicalOptRuleSet = getLogicalOptRuleSet +val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() +val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { + runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) --- End diff -- Yes, i will first test how HepPlanner works in some test and product environments and then decide whether or how we change it to rule-based planner. So this will be my follow up task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3695: [FLINK-5545] [table] Remove FlinkAggregateExpandDi...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3695 [FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRul⦠â¦e after bumping Calcite to v1.12. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-5545 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3695.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3695 commit 2d8c740852e56cbc54120acea234975771171a66 Author: Kurt Young Date: 2017-04-07T09:46:06Z [FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3689: [FLINK-5435] [table] Remove FlinkAggregateJoinTran...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3689 [FLINK-5435] [table] Remove FlinkAggregateJoinTransposeRule and Flink⦠â¦RelDecorrelator after calcite updated to 1.12 You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-5435 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3689.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3689 commit 6c4fb2f5a655129c178fdf543f64b240b3012e8b Author: Kurt Young Date: 2017-04-06T14:06:51Z [FLINK-5435] [table] Remove FlinkAggregateJoinTransposeRule and FlinkRelDecorrelator after calcite updated to 1.12 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3623: [FLINK-6196] [table] Support dynamic schema in Table Func...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3623 Looks good to me, @twalthr @fhueske can you also take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108628810 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,116 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val userDefinedResultType = tableFunction.getResultType(arguments) +val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType +} +TableFunctionCall(name, tableFunction, params, resultType) + } + + /** +* Transform the expressions or rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones +* +* @param params actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def expressionsToArguments(params: Expression*): java.util.List[AnyRef] = { +params.map { + case exp: Literal => +exp.value.asInstanceOf[AnyRef] + case _ => +null +} + } + + /** +* Transform the rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones +* +* @param rexNodes actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def rexNodesToArguments( + rexNodes: java.util.List[RexNode]): java.util.List[AnyRef] = { +rexNodes.map { + case rexNode: RexLiteral => --- End diff -- Can we add an unit test to ensure all supported literal are successfully transformed? And other non supported type will translate to null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108628113 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DeferredTypeFlinkTableFunction.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import java.lang.reflect.Method +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} + +/** + * A Deferred Type is a Table Function which the type hasn't been determined yet. + * If will determine the result type after the arguments are passed. --- End diff -- If -> It --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108625377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,116 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val userDefinedResultType = tableFunction.getResultType(arguments) +val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType +} +TableFunctionCall(name, tableFunction, params, resultType) + } + + /** +* Transform the expressions or rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones +* +* @param params actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def expressionsToArguments(params: Expression*): java.util.List[AnyRef] = { +params.map { + case exp: Literal => +exp.value.asInstanceOf[AnyRef] + case _ => +null +} + } + + /** +* Transform the rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones +* +* @param rexNodes actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def rexNodesToArguments( --- End diff -- transformLiteralRexNodes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108625307 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,116 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val userDefinedResultType = tableFunction.getResultType(arguments) +val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType +} +TableFunctionCall(name, tableFunction, params, resultType) + } + + /** +* Transform the expressions or rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones +* +* @param params actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def expressionsToArguments(params: Expression*): java.util.List[AnyRef] = { +params.map { + case exp: Literal => +exp.value.asInstanceOf[AnyRef] + case _ => +null +} + } + + /** +* Transform the rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones --- End diff -- Please update the comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108624961 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,116 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val userDefinedResultType = tableFunction.getResultType(arguments) +val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType +} +TableFunctionCall(name, tableFunction, params, resultType) + } + + /** +* Transform the expressions or rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones --- End diff -- Only literal expressions will be transformed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108624874 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,116 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val userDefinedResultType = tableFunction.getResultType(arguments) +val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType +} +TableFunctionCall(name, tableFunction, params, resultType) + } + + /** +* Transform the expressions or rex nodes to Objects --- End diff -- rex nodes are not handled in this method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108624761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,116 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val userDefinedResultType = tableFunction.getResultType(arguments) +val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType +} +TableFunctionCall(name, tableFunction, params, resultType) + } + + /** +* Transform the expressions or rex nodes to Objects +* Only literal expressions will be passed, nulls for non-literal ones +* +* @param params actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def expressionsToArguments(params: Expression*): java.util.List[AnyRef] = { --- End diff -- rename to transformLiteralExpresssions to better fits the purpose? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108320480 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -86,12 +90,79 @@ abstract class TableFunction[T] extends UserDefinedFunction { * @return [[Expression]] in form of a [[TableFunctionCall]] */ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = { -val resultType = if (getResultType == null) { - typeInfo +buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*) + } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall(name: String, +implicitResultType: TypeInformation[_], +params: Expression*): TableFunctionCall = { +val arguments = expressionsToArguments(params: _*) +val resultType = getResultType(arguments, implicitResultType) +TableFunctionCall(name, this, params, resultType) + } + + /** +* Internal user of [[getResultType()]] +* +* @param arguments arguments of a function call (only literal arguments +* are passed, nulls for non-literal ones) +* @param implicitResultType The implicit result type +* @return [[TypeInformation]] of result type or null if Flink should determine the type +*/ + private[table] def getResultType(arguments: java.util.List[AnyRef], --- End diff -- The format of the parameters should be consistent with others --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108320419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -27,52 +27,73 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param implicitResultType The implicit result type. + * @param evalMethod The eval() method of the [[tableFunction]] */ -class FlinkTableFunctionImpl[T]( -val typeInfo: TypeInformation[T], -val fieldIndexes: Array[Int], -val fieldNames: Array[String], -val evalMethod: Method) +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_], --- End diff -- Can we just separate this implementation into 2 classes, one for which we already know the type, and one for whose type has not been decided yet. Why i suggesting this is that i noticed `tableFunction.getResultType` is called multiple times when evaluating the plan, even we had already determine the row type long time ago, i.e. when `apply` method from `TableFunction` has been called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108319900 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -27,52 +27,73 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param implicitResultType The implicit result type. + * @param evalMethod The eval() method of the [[tableFunction]] */ -class FlinkTableFunctionImpl[T]( -val typeInfo: TypeInformation[T], -val fieldIndexes: Array[Int], -val fieldNames: Array[String], -val evalMethod: Method) +class FlinkTableFunctionImpl[T](val tableFunction: FlinkUDTF[_], +val implicitResultType: TypeInformation[_], +val evalMethod: Method) extends ReflectiveFunctionBase(evalMethod) with TableFunction { - if (fieldIndexes.length != fieldNames.length) { -throw new TableException( - "Number of field indexes and field names must be equal.") - } + override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { -throw new TableException( - "Table field names must be unique.") + override def getRowType(typeFactory: RelDataTypeFactory, + arguments: util.List[AnyRef]): RelDataType = { + +// Get the result type from table function. If it is not null, the implicitResultType may +// already be generated by Table API's apply() method. +val resultType = if (tableFunction.getResultType(arguments) != null) { + tableFunction.getResultType(arguments) +} else { + implicitResultType +} +val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) +buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes) } - val fieldTypes: Array[TypeInformation[_]] = -typeInfo match { - case cType: CompositeType[T] => -if (fieldNames.length != cType.getArity) { - throw new TableException( -s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") -} - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => -if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( -"Non-composite input type may have only a single field and its index must be 0.") -} -Array(aType) + private [table] def buildRelDataType(typeFactory: RelDataTypeFactory, --- End diff -- I think this method can go into `UserDefinedFunctionUtils` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318532 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -131,8 +202,10 @@ abstract class TableFunction[T] extends UserDefinedFunction { * method. Flink's type extraction facilities can handle basic types or --- End diff -- I think the comment should also be updated, to make user be aware of the behavior that he can determine the result type based on the parameters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -86,12 +90,79 @@ abstract class TableFunction[T] extends UserDefinedFunction { * @return [[Expression]] in form of a [[TableFunctionCall]] */ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = { -val resultType = if (getResultType == null) { - typeInfo +buildTableFunctionCall(getClass.getSimpleName, typeInfo, params: _*) + } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall(name: String, --- End diff -- Can we move this to a util class? Since this is a user facing API class, we don't want to expose such kind of thing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r108318278 --- Diff: flink-core/src/test/java/org/apache/flink/types/RowTest.java --- @@ -21,6 +21,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; --- End diff -- this import is unused --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3553: [FLINK-6068] [table] Support If() as a built-in function ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3553 Since `IF` is not sql standard, i think we could let java user just use `case when` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3559: [flink-6037] [Table API & SQL]hotfix: metadata pro...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3559#discussion_r106796945 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala --- @@ -100,6 +101,7 @@ class FlinkPlannerImpl( assert(validatedSqlNode != null) val rexBuilder: RexBuilder = createRexBuilder val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE) --- End diff -- I remembered that in #3058 you also set the provider to `RelMetadataQuery.THREAD_PROVIDERS` by calling: ``` RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(cluster.getMetadataProvider)) ``` Why it's not needed here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3406 Seems good to merge now, merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3560#discussion_r106772134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -227,18 +227,23 @@ object ProjectionTranslator { * @param exprs a list of expressions to extract * @return a list of field references extracted from the given expressions */ - def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] = { -exprs.foldLeft(Set[NamedExpression]()) { + def extractFieldReferences(exprs: Seq[Expression]): List[NamedExpression] = { +exprs.foldLeft(List[NamedExpression]()) { (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences) -}.toSeq +} } private def identifyFieldReferences( expr: Expression, - fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr match { + fieldReferences: List[NamedExpression]): List[NamedExpression] = expr match { --- End diff -- The order is really depending on how we extract fields from all kinds of expressions. Like `BinaryExpression`, we first extract `left child`, and then `right child`. And for `Funtion Calls`, we extract the field from parameter with left to right order. More complex example will be `over`, imagine there is an aggregate on a partitioned window. Should the fields appeared in the aggregate or the field which partitioned on should be considered first? So i think this kind of order is hard to define and hard to stay consistency, it will change easily when we modifying the codes. We should not rely anything on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3560#discussion_r106669352 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -227,18 +227,23 @@ object ProjectionTranslator { * @param exprs a list of expressions to extract * @return a list of field references extracted from the given expressions */ - def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] = { -exprs.foldLeft(Set[NamedExpression]()) { + def extractFieldReferences(exprs: Seq[Expression]): List[NamedExpression] = { +exprs.foldLeft(List[NamedExpression]()) { (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences) -}.toSeq +} } private def identifyFieldReferences( expr: Expression, - fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr match { + fieldReferences: List[NamedExpression]): List[NamedExpression] = expr match { --- End diff -- I think using Set here gave us better intuition that we actually want to count uniq fields, any reason why you want to keep the order? Especially the order is not defined very well, it will change easily if we modify the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106606071 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { -if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return -} +Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram +val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -tableSourceTable.tableEnv.getFunctionCatalog) +functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } -val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) -// trying to apply filter push down, set the flag to true no matter whether -// we actually push any filters down. -newTableSource.setFilterPushedDown(true) +val remainingPredicates = new util.LinkedList[Expression]() +predicates.foreach(e => remainingPredicates.add(e)) + +val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- For example the condition looks like "a OR b", if the table source knows that `a` will always be `false`, but not sure with `b`. So it can actually modify the condition to `b` to reduce framework cost --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Sure, so the result is there will be 2 commits to master repository? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske, thanks for the review. I addressed all your comments and will rebase to master to let travis check. Will merge this after build success. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106568641 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala --- @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table + +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.sources.{CsvTableSource, TableSource} +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.expressions.utils._ +import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource} +import org.junit.{Assert, Test} + +class TableSourceTest extends TableTestBase { + + private val projectedFields: Array[String] = Array("last", "id", "score") + private val noCalcFields: Array[String] = Array("id", "score", "first") + + // batch plan + + @Test + def testBatchProjectableSourceScanPlanTableApi(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('last.upperCase(), 'id.floor(), 'score * 2) + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchProjectableSourceScanPlanSQL(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() + +util.tEnv.registerTableSource(tableName, tableSource) + +val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName" + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2") +) + +util.verifySql(sqlQuery, expected) + } + + @Test + def testBatchProjectableSourceScanNoIdentityCalc(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('id, 'score, 'first) + +val expected = batchSourceTableNode(tableName, noCalcFields) +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterableWithoutPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv +.scan(tableName) +.select('price, 'id, 'amount) +.where("price * 2 < 32") + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode( +tableName, +Array("name", "id", "amount", "price")), + term("select", "price", "id", "amount"), + term("where", "<(*(price, 2), 32)") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterablePartialPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registe
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566787 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala --- @@ -16,106 +16,96 @@ * limitations under the License. */ -package org.apache.flink.table.plan.rules.util +package org.apache.flink.table.plan.util import java.math.BigDecimal +import java.util import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder} -import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR} +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR, BOOLEAN} import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ -import org.junit.Assert.{assertArrayEquals, assertTrue} -import org.junit.{Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable -/** - * This class is responsible for testing RexProgramProjectExtractor. - */ -class RexProgramProjectExtractorTest { - private var typeFactory: JavaTypeFactory = _ - private var rexBuilder: RexBuilder = _ - private var allFieldTypes: Seq[RelDataType] = _ - private val allFieldNames = List("name", "id", "amount", "price") - - @Before - def setUp(): Unit = { -typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) -rexBuilder = new RexBuilder(typeFactory) -allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_)) - } +abstract class RexProgramTestBase { - @Test - def testExtractRefInputFields(): Unit = { -val usedFields = extractRefInputFields(buildRexProgram()) -assertArrayEquals(usedFields, Array(2, 3, 1)) - } + val typeFactory: JavaTypeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) + + val allFieldNames: util.List[String] = List("name", "id", "amount", "price", "flag").asJava + + val allFieldTypes: util.List[RelDataType] = +List(VARCHAR, BIGINT, INTEGER, DOUBLE, BOOLEAN).map(typeFactory.createSqlType).asJava + + var rexBuilder: RexBuilder = new RexBuilder(typeFactory) - @Test - def testRewriteRexProgram(): Unit = { -val originRexProgram = buildRexProgram() -assertTrue(extractExprStrList(originRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "$3", - "*($t2, $t3)", - "100", - "<($t4, $t5)", - "6", - ">($t1, $t7)", - "AND($t6, $t8)"))) -// use amount, id, price fields to create a new RexProgram -val usedFields = Array(2, 3, 1) -val types = usedFields.map(allFieldTypes(_)).toList.asJava -val names = usedFields.map(allFieldNames(_)).toList.asJava -val inputRowType = typeFactory.createStructType(types, names) -val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder) -assertTrue(extractExprStrList(newRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "*($t0, $t1)", - "100", - "<($t3, $t4)", - "6", - ">($t2, $t6)", - "AND($t5, $t7)"))) + /** +* extract all expression string list from input RexProgram expression lists +* +* @param rexProgram input RexProgram instance to analyze +* @return all expression string list of input RexProgram expression lists +*/ + protected def extractExprStrList(rexProgram: RexProgram): mutable.Buffer[String] = { +rexProgram.getExprList.asScala.map(_.toString) } - private def buildRexProgram(): RexProgram = { -val types = allFieldTypes.asJava -val names = allFieldNames.asJava -val inputRowType = typeFactory.createStructType(types, names) + // select amount, amount * price as total where amount * price < 100 and id > 6 + protected def buildSimpleRexProgram1(): RexProgram = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) val builder = new RexProgramBuilder(inputRowType, rexBuilder) -val t0 = rexBuilder.makeInputRef(types.get(2), 2)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566599 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import java.math.BigDecimal + +import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder} +import org.apache.calcite.sql.SqlPostfixOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.validate.FunctionCatalog +import org.junit.Assert.{assertArrayEquals, assertEquals} +import org.junit.Test + +import scala.collection.JavaConverters._ + +class RexProgramExtractorTest extends RexProgramTestBase { + + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns + + @Test + def testExtractRefInputFields(): Unit = { +val usedFields = RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1()) +assertArrayEquals(usedFields, Array(2, 3, 1)) + } + + @Test + def testExtractSimpleCondition(): Unit = { +val builder: RexBuilder = new RexBuilder(typeFactory) +val program = buildSimpleRexProgram2() + +val firstExp = ExpressionParser.parseExpression("id > 6") +val secondExp = ExpressionParser.parseExpression("amount * price < 100") +val expected: Array[Expression] = Array(firstExp, secondExp) + +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +builder, +functionCatalog) + +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + @Test + def testExtractSingleCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) + +// a = amount >= id +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1)) +builder.addCondition(a) + +val program = builder.getProgram +val relBuilder: RexBuilder = new RexBuilder(typeFactory) +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +relBuilder, +functionCatalog) + +val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id")) +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d) + @Test + def testExtractCnfCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) +// price +val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3) +// 100 +val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) + +// a = amount < 100 +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3)) +// b = id > 100 +val b = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t3)) +// c = price == 100 +val
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566606 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import java.math.BigDecimal + +import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder} +import org.apache.calcite.sql.SqlPostfixOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.validate.FunctionCatalog +import org.junit.Assert.{assertArrayEquals, assertEquals} +import org.junit.Test + +import scala.collection.JavaConverters._ + +class RexProgramExtractorTest extends RexProgramTestBase { + + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns + + @Test + def testExtractRefInputFields(): Unit = { +val usedFields = RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1()) +assertArrayEquals(usedFields, Array(2, 3, 1)) + } + + @Test + def testExtractSimpleCondition(): Unit = { +val builder: RexBuilder = new RexBuilder(typeFactory) +val program = buildSimpleRexProgram2() + +val firstExp = ExpressionParser.parseExpression("id > 6") +val secondExp = ExpressionParser.parseExpression("amount * price < 100") +val expected: Array[Expression] = Array(firstExp, secondExp) + +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +builder, +functionCatalog) + +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + @Test + def testExtractSingleCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) + +// a = amount >= id +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1)) +builder.addCondition(a) + +val program = builder.getProgram +val relBuilder: RexBuilder = new RexBuilder(typeFactory) +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +relBuilder, +functionCatalog) + +val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id")) +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d) + @Test + def testExtractCnfCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) +// price +val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3) +// 100 +val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) + +// a = amount < 100 +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3)) +// b = id > 100 +val b = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t3)) +// c = price == 100 +val
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566042 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.expressions._ +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource, TableSource} +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.tools.nsc.interpreter.JList + +/** + * This source can only handle simple comparision with field "amount". + * Supports ">, <, >=, <=, =, <>" with an integer. + */ +class TestFilterableTableSource( +val recordNum: Int = 33) +extends BatchTableSource[Row] +with StreamTableSource[Row] +with FilterableTableSource[Row] { + + var filterPushedDown: Boolean = false + + val fieldNames: Array[String] = Array("name", "id", "amount", "price") + + val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, DOUBLE) + + // all predicates with filed "amount" + private var filterPredicates = new mutable.ArrayBuffer[Expression] + + // all comparing values for field "amount" + private val filterValues = new mutable.ArrayBuffer[Int] + + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { +execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { +execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def explainSource(): String = { +if (filterPredicates.nonEmpty) { + s"filter=[${filterPredicates.reduce((l, r) => And(l, r)).toString}]" +} else { + "" +} + } + + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) + + override def applyPredicate(predicates: JList[Expression]): TableSource[Row] = { +val newSource = new TestFilterableTableSource(recordNum) +newSource.filterPushedDown = true + +val iterator = predicates.iterator() +while (iterator.hasNext) { + iterator.next() match { +case expr: BinaryComparison => + (expr.left, expr.right) match { +case (f: ResolvedFieldReference, v: Literal) if f.name.equals("amount") => + newSource.filterPredicates += expr + newSource.filterValues += v.value.asInstanceOf[Number].intValue() + iterator.remove() +case (_, _) => + } + } +} + +newSource + } + + override def isFilterPushedDown: Boolean = filterPushedDown + + private def generateDynamicCollection(): Seq[Row] = { +Preconditions.checkArgument(filterPredicates.length == filterValues.length) + +for { + cnt <- 0 until recordNum + if shouldCreateRow(cnt) +} yield { + val row = new Row(fieldNames.length) --- End diff -- Thanks for the tips! --- If your project is set up for it, you can reply
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106565668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { -if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return -} +Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram +val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -tableSourceTable.tableEnv.getFunctionCatalog) +functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } -val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) -// trying to apply filter push down, set the flag to true no matter whether -// we actually push any filters down. -newTableSource.setFilterPushedDown(true) +val remainingPredicates = new util.LinkedList[Expression]() +predicates.foreach(e => remainingPredicates.add(e)) + +val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- I think we don't have to restrict this. If the user for some reason indeed want to change the predicates which returns back and executed by framework, we should allow them to do so. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106565688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -20,47 +20,40 @@ package org.apache.flink.table.sources import org.apache.flink.table.expressions.Expression +import scala.tools.nsc.interpreter.JList --- End diff -- sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske, you just pointed out a question i had when tonycox implementing the first version. Why we are preventing `FilterableTableSource` from modifying the expressions? I think it's totally up to them whether changing the expressions they took or keep them just they were. We have not reason to restrict the `FilterableTableSource`'s behavior if they just do the things right. Pass in a java list and told user who extending this to pick and remove the expressions the support is not super nice. But even if user just pick expressions but not remove them, we still get the correct answer. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske @godfreyhe, thanks for the review, i addressed most all your comments. @fhueske Except for letting `TableSourceScan` be aware of whether filter has been pushed down. I'm not sure to let the `TableSourceScan` has this kind of information, i'd prefer to let them stay within the all kinds of `TableSource`. One drawback to let `TableSourceScan` has such kind of information is when we do the `TableSourceScan` copy, we need to take care all these information, make sure they also be copied correctly. In the future, if we add more extension to `TableSource` like we can push part of query down, we will face this problem. Regarding to the interface of `FilterableTableSource`, i agree with you that the trait containing some logic is not friendly with java extensions. So i removed the default implementation of `isFilterPushedDown`, the inherited class should take care of this method. And regarding the `Tuple2` thing, how about we pass in a mutable java list, and let table source to *pick out* expression from it and return a copy of table source which contains these pushed down predicates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335718 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { --- End diff -- Changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = +fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = +call.operands.foreach(operand => operand.accept(this)) +
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = +fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = +call.operands.foreach(operand => operand.accept(this)) +
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329225 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106328751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T]( val tableSource: TableSource[T], +val tableEnv: TableEnvironment, --- End diff -- Yes, you are right, especially that UDF is currently registered as objects but not classes, it's really impossible to let TableSource supporting this. I will remove this filed and only use built-in functions when extracting expression form RexProgram. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = +fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = +call.operands.foreach(operand => operand.accept(this)) +
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329079 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { --- End diff -- Will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106327707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { + // The rule can get triggered again due to the transformed "scan => filter" + // sequence created by the earlier execution of this rule when we could not + // push all the conditions into the scan + return +} + +val program = calc.getProgram +val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +call.builder().getRexBuilder, +tableSourceTable.tableEnv.getFunctionCatalog) +if (predicates.isEmpty) { + // no condition can be translated to expression + return +} + +val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) +// trying to apply filter push down, set the flag to true no matter whether +// we actually push any filters down. +newTableSource.setFilterPushedDown(true) + +// check whether framework still need to do a filter +val relBuilder = call.builder() +val remainingCondition = { + if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) { +relBuilder.push(scan) +(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ unconvertedRexNodes) +.reduce((l, r) => relBuilder.and(l, r)) + } else { +null + } +} + +// check whether we still need a RexProgram. An RexProgram is needed when either +// projection or filter exists. +val newScan = scan.copy(scan.getTraitSet, newTableSource) +val newRexProgram = { + if (remainingCondition != null || program.getProjectList.size() > 0) { --- End diff -- Thanks for the tips, will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324883 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -58,16 +60,24 @@ class BatchTableSourceScan( ) } + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { +new BatchTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[BatchTableSource[_]] +) + } + override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) +val terms = super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) +tableSource.explainTerms(terms) --- End diff -- will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -39,4 +39,6 @@ trait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] + /** Describes the table source */ + def explainTerms(pw: RelWriter): RelWriter = pw --- End diff -- Make sense to me, will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106322568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -42,63 +41,40 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, -private[flink] val calcProgram: RexProgram, // for tests +calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) --- End diff -- This is because i want to unify the PushFilterIntoScan rule's code for both batch and stream mode. During executing the rule, we may need to create a new copy of the DataSetCalc or DataStreamCalc. It make things more easier to let these two classes inherit from `Calc`, and use `Calc.copy` to create a new copied instance. I do encountered some problem after i changed the hierarchy, some unit tests failed because of the plan changed. But it's because we don't calculate the cost for Calc right. I added some logic to `CommanCalc.computeSelfCost`, and everything works fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106321885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala --- @@ -24,7 +24,7 @@ package org.apache.flink.table.sources * * @tparam T The return type of the [[ProjectableTableSource]]. */ -trait ProjectableTableSource[T] { +trait ProjectableTableSource[T] extends TableSource[T] { --- End diff -- It's because i want to unify the PushProjectIntoScan rule codes for both batch and stream mode. And once we push down project into table source, we not only should create a new TableScan instance, but also a new TableSource instance. The codes are like: ``` val newTableSource = originTableSource.projectFields(usedFields) // create a new scan with the new TableSource instance val newScan = scan.copy(scan.getTraitSet, newTableSource) ``` At first the `projectFields` method returned `ProjectableTableSource` which is not a `TableSource`, so i let `ProjectableTableSource` inherit from `TableSource`. But i just noticed we can just let `projectFields` return one `TableSource`, and problem resolved. Will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3530: [FLINK-6040] [table] DataStreamUserDefinedFunctionITCase ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3530 looks good, merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3530: [FLINK-6040] [table] DataStreamUserDefinedFunction...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3530#discussion_r105869656 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala --- @@ -85,6 +85,8 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB def testUserDefinedTableFunctionWithParameter(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear --- End diff -- move these to case set up? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 To address the problem of not reusing `TableSource` when we create a new scan for table source, i changed some inheritance for current `BatchScan`, `StreamScan`, `BatchTableSourceScan`, `StreamTableSourceScan` and so on. The new structure is moe likely as the relationship between `FlinkTable` and `TableSourceTable`, `DataSetTable`, `DataStreamTable`. After changing the structure, it also make it possible to unify the push project into scan rule for both batch and stream mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3520 [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it This PR is based on #3166 , and added following changes: 1. Refactor `RexProgramExpressionExtractor` and `RexProgramExpressionExtractor` to `RexProgramExtractor` and `RexProgramRewriter`. `RexProgramExtractor` is responsible for retract either projection expressions or filter expression. 2. Make sure we don't fail during extracting and converting filter RexNodes to expressions. The expressions which successfully translated and unconverted RexNodes will both be returned. 3. Add some tests for `RexProgramExtractor`. 4. Provide unified `PushFilterIntoTableSourceScanRuleBase` to support filter push down in both batch and stream mode. 5. Add some logical tests for filter push down in different situations, like fully push down and partial push down. 5. Slight change of testing class `TestFilterableTableSource` to make it less specialized. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-3849 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3520.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3520 commit 0a7af41509d9a0db3e791cb9f4dc5a1a8086f0b2 Author: tonycox Date: 2017-01-11T09:15:49Z [FLINK-3849] Add FilterableTableSource interface and Rules for pushing it commit 549b4e00e68d32f070e196fc6eb9a7f5f9e937c3 Author: tonycox Date: 2017-01-31T12:41:52Z fix filterable test commit 9aa82062832e0aabcb003e582c8130aeecc91a73 Author: tonycox Date: 2017-02-16T13:32:33Z rebase and trying fix rexnode parsing commit 646a6931224c7dcc58501ec014ab675925bb105d Author: tonycox Date: 2017-02-17T16:48:40Z create wrapper and update rules commit abfa38d894aa86b7a5c91dd29bf398b880c8bfe7 Author: Kurt Young Date: 2017-03-13T07:30:13Z [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3510: [FLINK-6023] Fix Scala snippet into Process Function Doc
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3510 Thanks @maocorte for the quick fix, looks good to merge now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105422108 --- Diff: docs/dev/stream/process_function.md --- @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream> stream = ...; +val stream: DataStream[Tuple2[String, String]] = ... // apply the process function onto a keyed stream -DataStream> result = stream -.keyBy(0) -.process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2[String, Long]] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state - */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts - */ -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] { /** The state that is maintained by this process function */ - lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext() - .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp])) + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext +.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = { // initialize or retrieve/update the state +val (key, _) = value --- End diff -- Sorry i didn't make myself clear. What IDS complains is the variable name `key` is conflicts with the following lines: ```case CountWithTimestamp(key, count, _) => CountWithTimestamp(key, count + 1, ctx.timestamp) ``` It's not clear whether you want to use the `key` you just defined or the `key` in the match pattern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105419777 --- Diff: docs/dev/stream/process_function.md --- @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream> stream = ...; +val stream: DataStream[Tuple2[String, String]] = ... // apply the process function onto a keyed stream -DataStream> result = stream -.keyBy(0) -.process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2[String, Long]] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state - */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts - */ -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] { --- End diff -- The class name should change to CountWithTimeoutFunction to be consistency with java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105420103 --- Diff: docs/dev/stream/process_function.md --- @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream> stream = ...; +val stream: DataStream[Tuple2[String, String]] = ... // apply the process function onto a keyed stream -DataStream> result = stream -.keyBy(0) -.process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2[String, Long]] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state - */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts - */ -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] { /** The state that is maintained by this process function */ - lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext() - .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp])) + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext +.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = { --- End diff -- I think it should be value: (String, String) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105420011 --- Diff: docs/dev/stream/process_function.md --- @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream> stream = ...; +val stream: DataStream[Tuple2[String, String]] = ... // apply the process function onto a keyed stream -DataStream> result = stream -.keyBy(0) -.process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2[String, Long]] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state - */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts - */ -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] { --- End diff -- And the first type for ProcessFunction should be (String, String) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105418830 --- Diff: docs/dev/stream/process_function.md --- @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream> stream = ...; +val stream: DataStream[Tuple2[String, String]] = ... // apply the process function onto a keyed stream -DataStream> result = stream -.keyBy(0) -.process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2[String, Long]] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** - * The data type stored in the state - */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** - * The implementation of the ProcessFunction that maintains the count and timeouts - */ -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] { /** The state that is maintained by this process function */ - lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext() - .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp])) + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext +.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = { // initialize or retrieve/update the state +val (key, _) = value --- End diff -- can we change the name here? since the IDE reports `suspicious shadowing by a variable pattern`. Or you can just use value._1 instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3510: [FLINK-6023] Fix Scala snippet into Process Functi...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105417009 --- Diff: docs/dev/stream/process_function.md --- @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction --- End diff -- RichProcessFunction no longer exists, can you change it back to ProcessFunction? And there are some `RichProcessFunction`s in the java snippet, can you try to fix that too? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3488: [FLINK-5971] [flip-6] Add timeout for registered jobs on ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3488 +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3406 Hi @fhueske , i like your propose about moving the annotation from `TableSource` to `TableSourceConverter`. Lets do it this way. BTW, i noticed that you offered three possible methods to the `TableSourceConverter`, i can only imagine `def requiredProperties: Array[String] ` is necessary for now. It can help validating the converter and to decide which converter we should use when multiple converters have the same `TableType`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---