[ 
https://issues.apache.org/jira/browse/FLINK-17892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17114547#comment-17114547
 ] 

hailong wang edited comment on FLINK-17892 at 5/23/20, 5:34 AM:
----------------------------------------------------------------

Hi [~lzljs3620320], No, kafka source will not be reused.

For the following sql: 
{code:java}
val sql1 = "CREATE TABLE SS (" +
   " a int," +
   " b bigint," +
   " c varchar" +
  ") WITH (" +
  "'connector.type' = 'TestTableSource' "+
  ")"
util.tableEnv.sqlUpdate(sql1);
util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())

val sqlQuery =
  """
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k1' = 'v1') */ WHERE a > 10)
    |UNION ALL
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k2' = 'v2') */ WHERE a > 10)
  """.stripMargin
util.verifyPlan(sqlQuery)
{code}
The result plan is :
{code:java}
Union(all=[true], union=[a, EXPR$1])
:- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, 
source: [TestTableSource(a, b, c)], dynamic options: {k1=v1}]], fields=[a, b, 
c])
+- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, 
source: [TestTableSource(a, b, c)], dynamic options: {k2=v2}]], fields=[a, b, 
c])
{code}
For the dynamic options is a part of table digest. It is not reused of source.

But I think dynamic options  is used to  override table properties, and table 
properties is not a part of  table digest, so dynamic options may be also not 
be a part of table digest.

What I'm looking forward is source can be reused Even if the table hint is 
different.

For another example,

If the source is kafka, whether source is reused or not will generate different 
results.

If the kafka source is reused, tableSink and tableSink1 will hava a full set of 
data at the same time from source.

But if the kafka source is not reused, tableSink and tableSink1 will have a 
full set of data together.

I think the first case will be correct.

 


was (Author: hailong wang):
Hi [~lzljs3620320], No, kafka source will not be reused.

For the following sql:

 
{code:java}
val sql1 = "CREATE TABLE SS (" +
   " a int," +
   " b bigint," +
   " c varchar" +
  ") WITH (" +
  "'connector.type' = 'TestTableSource' "+
  ")"
util.tableEnv.sqlUpdate(sql1);
util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())

val sqlQuery =
  """
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k1' = 'v1') */ WHERE a > 10)
    |UNION ALL
    |(SELECT a, random_udf() FROM SS /*+ OPTIONS('k2' = 'v2') */ WHERE a > 10)
  """.stripMargin
util.verifyPlan(sqlQuery)
{code}
The result plan is :

 

 
{code:java}
Union(all=[true], union=[a, EXPR$1])
:- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, 
source: [TestTableSource(a, b, c)], dynamic options: {k1=v1}]], fields=[a, b, 
c])
+- Calc(select=[a, random_udf() AS EXPR$1], where=[>(a, 10)])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, SS, 
source: [TestTableSource(a, b, c)], dynamic options: {k2=v2}]], fields=[a, b, 
c])
{code}
For the dynamic options is a part of table digest. It is not reused of source.

 

But I think dynamic options  is used to  override table properties, and table 
properties is not a part of  table digest, so dynamic options may be also not 
be a part of table digest.

What I'm looking forward is source can be reused Even if the table hint is 
different.

For another example,

If the source is kafka, whether source is reused or not will generate different 
results.

If the kafka source is reused, tableSink and tableSink1 will hava a full set of 
data at the same time from source.

But if the kafka source is not reused, tableSink and tableSink1 will have a 
full set of data together.

I think the first case will be correct.

 

> Dynamic option may not be a part of the table digest
> ----------------------------------------------------
>
>                 Key: FLINK-17892
>                 URL: https://issues.apache.org/jira/browse/FLINK-17892
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0
>            Reporter: hailong wang
>            Priority: Critical
>             Fix For: 1.11.0
>
>
> For now, Table properties not be a part of table digest, but dynamic option 
> will be included.
> This will lead to an error when plan reused.
> if I defines a kafka table:
> {code:java}
> CREATE TABLE KAFKA (
> ……
> ) with (
> topic = 'xx',
> groupid = 'xxx'
> ……
> )
> Insert into sinktable select * from KAFKA;
> Insert into sinktable1 select * from KAFKA;{code}
> KAFKA source will be reused according to the SQL above.
> But if i add different table hint to dml, like:
> {code:java}
> Insert into sinktable select * from KAFKA /*+ OPTIONS('k1' = 'v1')*/;
> Insert into sinktable1 select * from KAFKA /*+ OPTIONS('k2' = 'v2')*/;
> {code}
> There will be two kafka tableSources  use the same groupid to  consumer the 
> same topic.
> So I think dynamic option may not be a part of the table digest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to