[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336548#comment-17336548
 ] 

Flink Jira Bot commented on FLINK-11020:


This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>  Labels: stale-major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17328514#comment-17328514
 ] 

Flink Jira Bot commented on FLINK-11020:


This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Timo Walther
>Priority: Major
>  Labels: stale-major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-12-04 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708626#comment-16708626
 ] 

Hequn Cheng commented on FLINK-11020:
-

[~fhueske] Opening another Jira is a good idea. Sorry that I have misunderstood 
you. I created FLINK-11070 and we can have the discussion of the cross join 
implementation there. 

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-12-04 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708452#comment-16708452
 ] 

Fabian Hueske commented on FLINK-11020:
---

Hi [~hequn8128], Yes, agreed. It would be good to have an easy option to enable 
cross products. However, IMO this is a different issue. Would you mind opening 
a Jira and move the discussion there? 

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-12-03 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708095#comment-16708095
 ] 

Hequn Cheng commented on FLINK-11020:
-

Hi [~fhueske] [~twalthr] Yes, I agree that we should not do reorder without 
cardinality estimates. However, we may not have the ability to give a 
meaningful exception without estimates, either. There is a chance that the user 
really wants to do a cross join, i.e, Table t2 and t3 is much smaller than t1. 
In this case, supporting a cross join would be a good choice(even with a 
parallelism of 1?). I think we can only trust the query written by the user 
before we introduce cardinality estimates. What do you think?

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-12-03 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707386#comment-16707386
 ] 

Timo Walther commented on FLINK-11020:
--

[~fhueske] yes, a more meaningful exception in case of cross joins would also 
be already very helpful. I would also not start to do heavy join reordering.

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-12-03 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707180#comment-16707180
 ] 

Fabian Hueske commented on FLINK-11020:
---

[~hequn8128], the query that [~twalthr] used in the description can be executed 
without a cross product. 
I think the question is not whether we should enable cross products (which is a 
valid question as well!) but whether we should enable join reordering to 
prevent cross products.

I don't think that should enable join reordering in general because reordering 
without cardinality estimates is gambling and we should keep the join order 
stable.
If we find a query that cannot be planned, we could try to optimize it with 
join ordering enabled or we raise a meaningful exception explaining the issue 
to the user.

I'm not sure what the best approach is, but silently enabling join reordering 
(without stats) is something that I'm not very comfortable with.




> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-12-02 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706580#comment-16706580
 ] 

Hequn Cheng commented on FLINK-11020:
-

Maybe we can implement cross join with parallelism of 1? For example, call 
forceNonParallel() for connectOperator in `DataStreamJoin` when it is cross 
join. We can do some optimization later, such as broadcasting the smaller side, 
etc.

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-11-28 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701876#comment-16701876
 ] 

Fabian Hueske commented on FLINK-11020:
---

I would not do that. Join reordering without statistics is like rolling a dice, 
i.e., we would end up with random plans.

Currently, joins are done in the order in which they are specified in the 
query, i.e., the behavior is deterministic and can be controlled by the user.
I think we would lose that if we add rules for join reordering.

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-11-28 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701884#comment-16701884
 ] 

Timo Walther commented on FLINK-11020:
--

I'm not suggesting to introduce join reordering in general but only in cases 
where we would fail otherwise. We could execute the query above if we would 
allow for join reordering in cases where a cross join would be the result 
otherwise.


> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-11-28 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701862#comment-16701862
 ] 

Timo Walther commented on FLINK-11020:
--

[~fhueske] what is your opinion here? Should we reorder joins in such cases or 
is failing hard acceptable?

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)