[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree

2024-02-23 Thread WenJun Min (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820262#comment-17820262
 ] 

WenJun Min commented on FLINK-31205:


[~tjbanghart] I have not prepare a PR for this ticket. In my previous poc, the 
{{MultiSink}} rel node can construct a single tree then feed it to calcite 
optimizer. So, IMO the {{MultiSink}} solution can do multi-query optimization.

However, it still have a problem that the calcite optimzer do not recognize the 
CTE during the optimization. So this solution do not have significant advantage 
than the current solution based on {{RelNodeBlock}} in Flink. So I have not 
prepared a PR for this.


> do optimize for multi sink in a single relNode tree 
> 
>
> Key: FLINK-31205
> URL: https://issues.apache.org/jira/browse/FLINK-31205
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: WenJun Min
>Priority: Major
>
> Flink supports multi sink usage, but it optimize the each sink in a 
> individual RelNode tree, this will miss some opportunity to do some cross 
> tree optimization, eg: 
> {code:java}
> create table newX(
>   a int,
>   b bigint,
>   c varchar,
>   d varchar,
>   e varchar
> ) with (
>   'connector' = 'values'
>   ,'enable-projection-push-down' = 'true'
> insert into sink_table select a, b from newX
> insert into sink_table select a, 1 from newX
> {code}
> It will produce the plan as below, this will cause the source be consumed 
> twice
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Calc(select=[a, 1 AS b])
>+- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a], metadata=[]]], fields=[a])
> {code}
> In this ticket, I propose to do a global optimization for the multi sink by 
> * Megre the multi sink(with same table) into a single relNode tree with an 
> extra union node
> * After optimization, split the merged union back to the original multi sink
> In my poc, after step 1, it will produce the plan as below, I think it will 
> do good for the global performacne
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Union(all=[true], union=[a, b])
>:- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
>+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
>   +- Reused(reference_id=[1])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree

2024-02-23 Thread TJ Banghart (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820259#comment-17820259
 ] 

TJ Banghart commented on FLINK-31205:
-

I happened upon this ticket looking for issues related to multi-query 
optimization. CALCITE-1440 looks like generalization of this problem. The 
proposed {{Combine}} rel node looks similar to {{{}MultiSink{}}}.

 

Do you have a PR for this change? Would love to see it and hopefully get 
something similar in Calcite.

> do optimize for multi sink in a single relNode tree 
> 
>
> Key: FLINK-31205
> URL: https://issues.apache.org/jira/browse/FLINK-31205
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: WenJun Min
>Priority: Major
>
> Flink supports multi sink usage, but it optimize the each sink in a 
> individual RelNode tree, this will miss some opportunity to do some cross 
> tree optimization, eg: 
> {code:java}
> create table newX(
>   a int,
>   b bigint,
>   c varchar,
>   d varchar,
>   e varchar
> ) with (
>   'connector' = 'values'
>   ,'enable-projection-push-down' = 'true'
> insert into sink_table select a, b from newX
> insert into sink_table select a, 1 from newX
> {code}
> It will produce the plan as below, this will cause the source be consumed 
> twice
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Calc(select=[a, 1 AS b])
>+- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a], metadata=[]]], fields=[a])
> {code}
> In this ticket, I propose to do a global optimization for the multi sink by 
> * Megre the multi sink(with same table) into a single relNode tree with an 
> extra union node
> * After optimization, split the merged union back to the original multi sink
> In my poc, after step 1, it will produce the plan as below, I think it will 
> do good for the global performacne
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Union(all=[true], union=[a, b])
>:- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
>+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
>   +- Reused(reference_id=[1])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree

2023-02-27 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694386#comment-17694386
 ] 

Aitozi commented on FLINK-31205:


looking forward to your opinion CC [~godfreyhe] [~twalthr] [~snuyanzin]

> do optimize for multi sink in a single relNode tree 
> 
>
> Key: FLINK-31205
> URL: https://issues.apache.org/jira/browse/FLINK-31205
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> Flink supports multi sink usage, but it optimize the each sink in a 
> individual RelNode tree, this will miss some opportunity to do some cross 
> tree optimization, eg: 
> {code:java}
> create table newX(
>   a int,
>   b bigint,
>   c varchar,
>   d varchar,
>   e varchar
> ) with (
>   'connector' = 'values'
>   ,'enable-projection-push-down' = 'true'
> insert into sink_table select a, b from newX
> insert into sink_table select a, 1 from newX
> {code}
> It will produce the plan as below, this will cause the source be consumed 
> twice
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Calc(select=[a, 1 AS b])
>+- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a], metadata=[]]], fields=[a])
> {code}
> In this ticket, I propose to do a global optimization for the multi sink by 
> * Megre the multi sink(with same table) into a single relNode tree with an 
> extra union node
> * After optimization, split the merged union back to the original multi sink
> In my poc, after step 1, it will produce the plan as below, I think it will 
> do good for the global performacne
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Union(all=[true], union=[a, b])
>:- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
>+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
>   +- Reused(reference_id=[1])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31205) do optimize for multi sink in a single relNode tree

2023-02-27 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694372#comment-17694372
 ] 

Aitozi commented on FLINK-31205:


After some research, I found that there are better choices than using a union 
to get a single tree. {{Union}} can only cover the use case of multi-sink to 
the same table because the {{Union}} enforces the type consistency.

We can add a new "virtual" RelNode, accepting the multi-sink as input. It can 
work as packing the multi-tree together so that, from the perspective of the 
optimizer, it can have the ability to do global optimization.

In my POC, I add a new type RelNode named {{MultiSink}} before passing it to 
the calcite optimizer. 
The MultiSink does not do any transformation on the inputs.

After logical optimization, the plan is

{code:java}
LogicalMultiSink
:- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, 
b])
:  +- LogicalProject(inputs=[0..1])
: +- LogicalTableScan(table=[[default_catalog, default_database, newX]])
+- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, 
b])
   +- LogicalProject(inputs=[0], exprs=[[1:BIGINT]])
  +- LogicalTableScan(table=[[default_catalog, default_database, newX]])
{code}

After physical optimization, the plan is 

{code:java}
MultiSink
:- Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
:  +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])
+- Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1])
   +- Calc(select=[a AS $f0, 1:BIGINT AS $f1])
  +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])
{code}

Before transforming to the ExecNode, we remove the {{MultiSink}} (which is only 
intended to work during the optimizing phase), then the final result can be 

{code:java}
TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, 
b], metadata=[]]], fields=[a, b])(reuse_id=[1])

Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Reused(reference_id=[1])

Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1])
+- Calc(select=[a AS $f0, 1 AS $f1])
   +- Reused(reference_id=[1])
{code}

With the new RelNode, single-tree optimization is possible. We can do more 
things during the single tree optimization, e.g., introduce the cost model for 
the CTE to decide whether to inline/reuse and so on.

> do optimize for multi sink in a single relNode tree 
> 
>
> Key: FLINK-31205
> URL: https://issues.apache.org/jira/browse/FLINK-31205
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> Flink supports multi sink usage, but it optimize the each sink in a 
> individual RelNode tree, this will miss some opportunity to do some cross 
> tree optimization, eg: 
> {code:java}
> create table newX(
>   a int,
>   b bigint,
>   c varchar,
>   d varchar,
>   e varchar
> ) with (
>   'connector' = 'values'
>   ,'enable-projection-push-down' = 'true'
> insert into sink_table select a, b from newX
> insert into sink_table select a, 1 from newX
> {code}
> It will produce the plan as below, this will cause the source be consumed 
> twice
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Calc(select=[a, 1 AS b])
>+- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a], metadata=[]]], fields=[a])
> {code}
> In this ticket, I propose to do a global optimization for the multi sink by 
> * Megre the multi sink(with same table) into a single relNode tree with an 
> extra union node
> * After optimization, split the merged union back to the original multi sink
> In my poc, after step 1, it will produce the plan as below, I think it will 
> do good for the global performacne
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Union(all=[true], union=[a, b])
>:- TableSourceScan(table=[[default_catalog, default_database, newX, 
> project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
>+- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
>   +- Reused(reference_id=[1])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)