[jira] [Closed] (FLINK-32052) Introduce left and right state retention time to StreamingJoinOperator

2023-05-16 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-32052.
--
Resolution: Fixed

Fixed in master: 5ba3f2bdea6fc7c9e58b50200806ea341b7dd3d3

 

> Introduce left and right state retention time to StreamingJoinOperator
> --
>
> Key: FLINK-32052
> URL: https://issues.apache.org/jira/browse/FLINK-32052
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> According to the FLIP design, we should introduce separate TTL variables to 
> the TwoInputStreamOperator, like StreamingJoinOperator and 
> StreamingAntiJoinOperator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31950) Introduce StateMetadata and StateMetadataJson SerDe

2023-05-14 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-31950.
--
Resolution: Fixed

Fixed in master: 62b11e2e117f874f073de93756c6b9889c464562

> Introduce StateMetadata and StateMetadataJson SerDe
> ---
>
> Key: FLINK-31950
> URL: https://issues.apache.org/jira/browse/FLINK-31950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> According to the FLIP design, we're about to introduce
>  * StateMetadata, which describes the TTL attribute of the stateful stream 
> operator.
>  * StateMetadata SerDerializers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30815) BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests are using JUnit5

2023-05-09 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30815.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

Fixed in master:

ed9ee279e50781b7bd2d85f1486721c02fc7e32b

7a423666d0f8452382ad5fe2635de5ad1475dd46

> BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests 
> are using JUnit5
> 
>
> Key: FLINK-30815
> URL: https://issues.apache.org/jira/browse/FLINK-30815
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> BatchTestBase/BatchAbstractTestBase are using Junit4, while some child tests 
> (e.g. DynamicFilteringITCase) are using JUnit5. This may break some 
> assumption and hide some problems.
> For example, the child test will create a MiniCluster by itself, instead of 
> using the MiniCluster(TM=1, slots=3)  created in BatchAbstractTestBase. The 
> created MiniCluster may  have more slots and hide resource deadlock issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30815) BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests are using JUnit5

2023-05-09 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30815:
--

Assignee: Yuxin Tan

> BatchTestBase/BatchAbstractTestBase are using JUnit4 while some child tests 
> are using JUnit5
> 
>
> Key: FLINK-30815
> URL: https://issues.apache.org/jira/browse/FLINK-30815
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> BatchTestBase/BatchAbstractTestBase are using Junit4, while some child tests 
> (e.g. DynamicFilteringITCase) are using JUnit5. This may break some 
> assumption and hide some problems.
> For example, the child test will create a MiniCluster by itself, instead of 
> using the MiniCluster(TM=1, slots=3)  created in BatchAbstractTestBase. The 
> created MiniCluster may  have more slots and hide resource deadlock issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31884) Upgrade ExecNode to new version causes the old serialized plan failed to pass Json SerDe round trip

2023-04-27 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717446#comment-17717446
 ] 

Godfrey He commented on FLINK-31884:


Fixed in master: 3664609c7622ccae80e36e85099a1b79b5935fe9

> Upgrade ExecNode to new version causes the old serialized plan failed to pass 
> Json SerDe round trip
> ---
>
> Key: FLINK-31884
> URL: https://issues.apache.org/jira/browse/FLINK-31884
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> h4. How to Reproduce
> Firstly, add a test to dump the compiled plan JSON.
> {code:java}
> @Test
> public void debug() {
> tableEnv.executeSql("create table foo (f0 int, f1 string) with 
> ('connector' = 'datagen')");
> tableEnv.executeSql("create table bar (f0 int, f1 string) with 
> ('connector' = 'print')");
> tableEnv.compilePlanSql("insert into bar select * from foo")
> .writeToFile(new File("/path/to/debug.json"));
> }
> {code}
> The JSON context is as follows
> {code:json}
> {
>   "flinkVersion" : "1.18",
>   "nodes" : [ {
> "id" : 1,
> "type" : "stream-exec-table-source-scan_1",
> "scanTableSource" : {
>   "table" : {
> "identifier" : "`default_catalog`.`default_database`.`foo`",
> "resolvedTable" : {
>   "schema" : {
> "columns" : [ {
>   "name" : "f0",
>   "dataType" : "INT"
> }, {
>   "name" : "f1",
>   "dataType" : "VARCHAR(2147483647)"
> } ],
> "watermarkSpecs" : [ ]
>   },
>   "partitionKeys" : [ ],
>   "options" : {
> "connector" : "datagen"
>   }
> }
>   }
> },
> "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> "description" : "TableSourceScan(table=[[default_catalog, 
> default_database, foo]], fields=[f0, f1])",
> "inputProperties" : [ ]
>   }, {
> "id" : 2,
> "type" : "stream-exec-sink_1",
> "configuration" : {
>   "table.exec.sink.keyed-shuffle" : "AUTO",
>   "table.exec.sink.not-null-enforcer" : "ERROR",
>   "table.exec.sink.type-length-enforcer" : "IGNORE",
>   "table.exec.sink.upsert-materialize" : "AUTO"
> },
> "dynamicTableSink" : {
>   "table" : {
> "identifier" : "`default_catalog`.`default_database`.`bar`",
> "resolvedTable" : {
>   "schema" : {
> "columns" : [ {
>   "name" : "f0",
>   "dataType" : "INT"
> }, {
>   "name" : "f1",
>   "dataType" : "VARCHAR(2147483647)"
> } ],
> "watermarkSpecs" : [ ]
>   },
>   "partitionKeys" : [ ],
>   "options" : {
> "connector" : "print"
>   }
> }
>   }
> },
> "inputChangelogMode" : [ "INSERT" ],
> "inputProperties" : [ {
>   "requiredDistribution" : {
> "type" : "UNKNOWN"
>   },
>   "damBehavior" : "PIPELINED",
>   "priority" : 0
> } ],
> "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> "description" : "Sink(table=[default_catalog.default_database.bar], 
> fields=[f0, f1])"
>   } ],
>   "edges" : [ {
> "source" : 1,
> "target" : 2,
> "shuffle" : {
>   "type" : "FORWARD"
> },
> "shuffleMode" : "PIPELINED"
>   } ]
> }
> {code}
> Then upgrade the StreamExecSink to a new version
> {code:java}
> @ExecNodeMetadata(
> name = "stream-exec-sink",
> version = 1,
> consumedOptions = {
> "table.exec.sink.not-null-enforcer",
> "table.exec.sink.type-length-enforcer",
> "table.exec.sink.upsert-materialize",
> "table.exec.sink.keyed-shuffle"
> },
> producedTransformations = {
> CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
> CommonExecSink.PARTITIONER_TRANSFORMATION,
> CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
> CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
> CommonExecSink.SINK_TRANSFORMATION
> },
> minPlanVersion = FlinkVersion.v1_15,
> minStateVersion = FlinkVersion.v1_15)
> @ExecNodeMetadata(
> name = "stream-exec-sink",
> version = 2,
> consumedOptions = {
> "table.exec.sink.not-null-enforcer",
> "table.exec.sink.type-length-enforcer",
> "table.exec.sink.upsert-materialize",
> "table.exec.sink.keyed-shuffle"
> },
> producedTransformations = {
> 

[jira] [Commented] (FLINK-31917) Loss of Idempotence in JsonSerDe Round Trip for AggregateCall and RexNode

2023-04-24 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715711#comment-17715711
 ] 

Godfrey He commented on FLINK-31917:


good catch [~qingyue] 

> Loss of Idempotence in JsonSerDe Round Trip for AggregateCall and RexNode
> -
>
> Key: FLINK-31917
> URL: https://issues.apache.org/jira/browse/FLINK-31917
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
> JsonSerDeTestUtil#testJsonRoundTrip only checks the equality between spec and 
> deserialized object. Some corner cases are detected when serializing the 
> deserialized object again.
> {code:java}
> static  T testJsonRoundTrip(SerdeContext serdeContext, T spec, Class 
> clazz)
> throws IOException {
> String actualJson = toJson(serdeContext, spec);
> T actual = toObject(serdeContext, actualJson, clazz);
> assertThat(actual).isEqualTo(spec);
> assertThat(actualJson).isEqualTo(toJson(serdeContext, actual)); // 
> this will eval some corner cases
> return actual;
> }
> {code}
> The discovered corner cases are listed as follows.
> h5. 1. SerDe for AggregateCall
> When deserializing the aggregate call, we should check the JsonNodeType to 
> avoid converting null to "null" string.
> [https://github.com/apache/flink/blob/f9b3e0b7bc0432001b4a197539a0712b16e0b33b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java#L64]
> h5. Suggested Fix
> {code:java}
> JsonNode nameNode = jsonNode.required(FIELD_NAME_NAME);
> final String name = JsonNodeType.NULL ? null : nameNode.asText();
> {code}
> h5. 2. SerDe for RexNode
> RexNodeJsonSerdeTest#testSystemFunction should create the temporary system 
> function instead of the temporary catalog function.
> [https://github.com/apache/flink/blob/f9b3e0b7bc0432001b4a197539a0712b16e0b33b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java#L209]
> h5. Suggested Fix
> Use functionCatalog#registerTemporarySystemFunction to test.
> h5. 3. About RexLiteral type
> RexNodeJsonSerdeTest#testRexNodeSerde has a test spec as follows
> {code:java}
> //This will create the literal with DOUBLE as the literal type, and DECIMAL 
> as the broad type of this literal. You can refer to Calcite for more details
> rexBuilder.makeExactLiteral(BigDecimal.valueOf(Double.MAX_VALUE), 
> FACTORY.createSqlType(SqlTypeName.DOUBLE))
> {code}
> The RexNodeJsonSerializer uses `typeName`(which is DECIMAL) as the literal's 
> type, as a result, the rel data type is serialized as double, but the value 
> is serialized as a string (in case lost the precision)
> [https://github.com/apache/flink/blob/f9b3e0b7bc0432001b4a197539a0712b16e0b33b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java#L197]
> And then, during the deserialization, according to the JSON, the deserialized 
> literal will assign DOUBLE as the literal type and the broad type of the 
> literal.
> This will cause the comparison failure
> {code:java}
> expected: {"kind": "LITERAL", "value": "1.7976931348623157E+308"}
> actual: {"kind": "LITERAL", "value": 1.7976931348623157E+308}
> {code}
> h5. Suggested Fix
> SARG is a special case and can be coped first, and for the rest type, we can 
> use literal.getType().getSqlTypeName() instead of literal.getTypeName().
> {code:java}
> // first cope with SARG type
> if (literal.getTypeName() == SARG) {
> serializeSargValue(
> (Sarg) value, literal.getType().getSqlTypeName(), gen, 
> serializerProvider);
> } else {
> serializeLiteralValue(
> value,
> literal.getType().getSqlTypeName(),
> gen,
> serializerProvider);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31917) Loss of Idempotence in JsonSerDe Round Trip for AggregateCall and RexNode

2023-04-24 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-31917:
--

Assignee: Jane Chan

> Loss of Idempotence in JsonSerDe Round Trip for AggregateCall and RexNode
> -
>
> Key: FLINK-31917
> URL: https://issues.apache.org/jira/browse/FLINK-31917
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
> JsonSerDeTestUtil#testJsonRoundTrip only checks the equality between spec and 
> deserialized object. Some corner cases are detected when serializing the 
> deserialized object again.
> {code:java}
> static  T testJsonRoundTrip(SerdeContext serdeContext, T spec, Class 
> clazz)
> throws IOException {
> String actualJson = toJson(serdeContext, spec);
> T actual = toObject(serdeContext, actualJson, clazz);
> assertThat(actual).isEqualTo(spec);
> assertThat(actualJson).isEqualTo(toJson(serdeContext, actual)); // 
> this will eval some corner cases
> return actual;
> }
> {code}
> The discovered corner cases are listed as follows.
> h5. 1. SerDe for AggregateCall
> When deserializing the aggregate call, we should check the JsonNodeType to 
> avoid converting null to "null" string.
> [https://github.com/apache/flink/blob/f9b3e0b7bc0432001b4a197539a0712b16e0b33b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java#L64]
> h5. Suggested Fix
> {code:java}
> JsonNode nameNode = jsonNode.required(FIELD_NAME_NAME);
> final String name = JsonNodeType.NULL ? null : nameNode.asText();
> {code}
> h5. 2. SerDe for RexNode
> RexNodeJsonSerdeTest#testSystemFunction should create the temporary system 
> function instead of the temporary catalog function.
> [https://github.com/apache/flink/blob/f9b3e0b7bc0432001b4a197539a0712b16e0b33b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java#L209]
> h5. Suggested Fix
> Use functionCatalog#registerTemporarySystemFunction to test.
> h5. 3. About RexLiteral type
> RexNodeJsonSerdeTest#testRexNodeSerde has a test spec as follows
> {code:java}
> //This will create the literal with DOUBLE as the literal type, and DECIMAL 
> as the broad type of this literal. You can refer to Calcite for more details
> rexBuilder.makeExactLiteral(BigDecimal.valueOf(Double.MAX_VALUE), 
> FACTORY.createSqlType(SqlTypeName.DOUBLE))
> {code}
> The RexNodeJsonSerializer uses `typeName`(which is DECIMAL) as the literal's 
> type, as a result, the rel data type is serialized as double, but the value 
> is serialized as a string (in case lost the precision)
> [https://github.com/apache/flink/blob/f9b3e0b7bc0432001b4a197539a0712b16e0b33b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java#L197]
> And then, during the deserialization, according to the JSON, the deserialized 
> literal will assign DOUBLE as the literal type and the broad type of the 
> literal.
> This will cause the comparison failure
> {code:java}
> expected: {"kind": "LITERAL", "value": "1.7976931348623157E+308"}
> actual: {"kind": "LITERAL", "value": 1.7976931348623157E+308}
> {code}
> h5. Suggested Fix
> SARG is a special case and can be coped first, and for the rest type, we can 
> use literal.getType().getSqlTypeName() instead of literal.getTypeName().
> {code:java}
> // first cope with SARG type
> if (literal.getTypeName() == SARG) {
> serializeSargValue(
> (Sarg) value, literal.getType().getSqlTypeName(), gen, 
> serializerProvider);
> } else {
> serializeLiteralValue(
> value,
> literal.getType().getSqlTypeName(),
> gen,
> serializerProvider);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31833) Support code-gen fusion for multiple operators

2023-04-18 Thread Godfrey He (Jira)
Godfrey He created FLINK-31833:
--

 Summary: Support code-gen fusion for multiple operators
 Key: FLINK-31833
 URL: https://issues.apache.org/jira/browse/FLINK-31833
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Godfrey He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31767) Improve the implementation for "analyze table" execution on partitioned table

2023-04-12 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-31767.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

Fixed in master: 0191cf7d0064003bd2d4b60ebb6e329f791c924a

> Improve the implementation for "analyze table" execution on partitioned table
> -
>
> Key: FLINK-31767
> URL: https://issues.apache.org/jira/browse/FLINK-31767
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Godfrey He
>Assignee: Godfrey He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Currently, for partitioned table, the "analyze table" command will generate a 
> separate SQL statement for each partition. When there are too many 
> partitions, the compilation/submission/execution time will be very long. This 
> issue aims to improve it: we can combine the sql statements for each 
> partition into one with "union all", and just need to execution one sql.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31767) Improve the implementation for "analyze table" execution on partitioned table

2023-04-11 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-31767:
---
Description: Currently, for partitioned table, the "analyze table" command 
will generate a separate SQL statement for each partition. When there are too 
many partitions, the compilation/submission/execution time will be very long. 
This issue aims to improve it: we can combine the sql statements for each 
partition into one with "union all", and just need to execution one sql.  (was: 
Currently, for partitioned table, )

> Improve the implementation for "analyze table" execution on partitioned table
> -
>
> Key: FLINK-31767
> URL: https://issues.apache.org/jira/browse/FLINK-31767
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Godfrey He
>Assignee: Godfrey He
>Priority: Major
>
> Currently, for partitioned table, the "analyze table" command will generate a 
> separate SQL statement for each partition. When there are too many 
> partitions, the compilation/submission/execution time will be very long. This 
> issue aims to improve it: we can combine the sql statements for each 
> partition into one with "union all", and just need to execution one sql.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31767) Improve the implementation for "analyze table" execution

2023-04-11 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-31767:
---
Description: Currently, 

> Improve the implementation for "analyze table" execution
> 
>
> Key: FLINK-31767
> URL: https://issues.apache.org/jira/browse/FLINK-31767
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Godfrey He
>Assignee: Godfrey He
>Priority: Major
>
> Currently, 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31767) Improve the implementation for "analyze table" execution on partitioned table

2023-04-11 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-31767:
---
Description: Currently, for partitioned table,   (was: Currently, )

> Improve the implementation for "analyze table" execution on partitioned table
> -
>
> Key: FLINK-31767
> URL: https://issues.apache.org/jira/browse/FLINK-31767
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Godfrey He
>Assignee: Godfrey He
>Priority: Major
>
> Currently, for partitioned table, 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31767) Improve the implementation for "analyze table" execution on partitioned table

2023-04-11 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-31767:
---
Summary: Improve the implementation for "analyze table" execution on 
partitioned table  (was: Improve the implementation for "analyze table" 
execution)

> Improve the implementation for "analyze table" execution on partitioned table
> -
>
> Key: FLINK-31767
> URL: https://issues.apache.org/jira/browse/FLINK-31767
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Godfrey He
>Assignee: Godfrey He
>Priority: Major
>
> Currently, 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31767) Improve the implementation for "analyze table" execution

2023-04-11 Thread Godfrey He (Jira)
Godfrey He created FLINK-31767:
--

 Summary: Improve the implementation for "analyze table" execution
 Key: FLINK-31767
 URL: https://issues.apache.org/jira/browse/FLINK-31767
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Godfrey He
Assignee: Godfrey He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-13611) Introduce analyze statistic utility to generate table & column statistics

2023-04-11 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-13611.
--
Resolution: Invalid

see https://issues.apache.org/jira/browse/FLINK-28492

> Introduce analyze statistic utility to generate table & column statistics
> -
>
> Key: FLINK-13611
> URL: https://issues.apache.org/jira/browse/FLINK-13611
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> this issue aims to introduce a utility class to generate table & column 
> statistics, the main steps include: 
> 1. generate sql, like
> {code:sql}
> select approx_count_distinct(a) as ndv, count(1) - count(a) as nullCount, 
> avg(char_length(a)) as avgLen, max(char_lenght(a)) as maxLen, max(a) as 
> maxValue, min(a) as minValue, ... from MyTable
> {code}
> 2. execute the query
> 3. convert to the result to {{TableStats}} (maybe the source table is not a 
> {{ConnectorCatalogTable}})
> 4. convert to {{TableStats}} to {{CatalogTableStatistics}} if needed
> This issue does not involve DDL(like {{ANALYZE TABLE XXX}}), however the DDL 
> could use this utility class once it's supported.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31165) Over Agg: The window rank function without order by error in top N query

2023-03-29 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706307#comment-17706307
 ] 

Godfrey He commented on FLINK-31165:


[~lincoln.86xy] [~qingyue] I prefer to just improve the error message in 
FlinkLogicalOverAggregateConverter, and it can be: The window rank function 
requires the order by with variable column.

> Over Agg: The window rank function without order by error in top N query
> 
>
> Key: FLINK-31165
> URL: https://issues.apache.org/jira/browse/FLINK-31165
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.16.0
>Reporter: P Rohan Kumar
>Priority: Major
>
>  
> {code:java}
> val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val td = TableDescriptor.forConnector("datagen").option("rows-per-second", 
> "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
> .newBuilder()
> .column("NAME", DataTypes.VARCHAR(2147483647))
> .column("ROLLNO", DataTypes.DECIMAL(5, 0))
> .column("DOB", DataTypes.DATE())
> .column("CLASS", DataTypes.DECIMAL(2, 0))
> .column("SUBJECT", DataTypes.VARCHAR(2147483647))
> .build())
>   .build()
> val table = tableEnv.from(td)
> tableEnv.createTemporaryView("temp_table", table)
> val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as 
> date) SRC_NO from temp_table")
> tableEnv.createTemporaryView("temp_table2", newTable)
> val newTable2 = tableEnv.sqlQuery("select * from (select 
> NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  
> from temp_table2 a) where rownum <= 1")
> tableEnv.toChangelogStream(newTable2).print()
> env.execute()
>  {code}
>  
>  
> I am getting the below error if I run the above code.
> I have already provided an order by column.
> If I change the order by column to some other column, such as "SUBJECT", then 
> the job runs fine.
>  
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args 
> [rel#245:LogicalWindow.NONE.any.None: 
> 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows 
> between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>     at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>     at 
> 

[jira] [Closed] (FLINK-31273) Left join with IS_NULL filter be wrongly pushed down and get wrong join results

2023-03-15 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-31273.
--
Fix Version/s: 1.16.2
   1.18.0
   1.17.1
   (was: 1.17.0)
   Resolution: Fixed

Fixed in

master: 8990822bd77d70f3249e1220a853e16dadd8ef54

1.17.1: 33278628dc599bed8944733efb9495ce77993d4b

1.16.2: f0361c720cb18c4ae7dc669c6a5da5b09bc8f563

> Left join with IS_NULL filter be wrongly pushed down and get wrong join 
> results
> ---
>
> Key: FLINK-31273
> URL: https://issues.apache.org/jira/browse/FLINK-31273
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Left join with IS_NULL filter be wrongly pushed down and get wrong join 
> results. The sql is:
> {code:java}
> SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 IS NULL AND a1 
> < 10
> The wrongly plan is:
> LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
> +- LogicalFilter(condition=[IS NULL($5)])
>    +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
>       :- LogicalValues(tuples=[[]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> MyTable2]]) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31239) Fix sum function can't get the corrected value when the argument type is string

2023-03-03 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-31239.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

Fixed in

1.18.0: 
263555c9adcca0abe194e9a6c1d85ec591c304e4..62a3b99d23229b39c798a0b657cb11218a5bc940

1.17.0: 
3bdb50513ddbbf6c67560a078da3f9506e5cd611..ac2eb5b977de47fc5550d2ee9f30fff4dcaca2b6

> Fix sum function can't get the corrected value when the argument type is 
> string
> ---
>
> Key: FLINK-31239
> URL: https://issues.apache.org/jira/browse/FLINK-31239
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-03-02 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reopened FLINK-30989:


> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.18.0
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
>  * BinaryHashTable,BaseHybridHashTable
>  ** 

[jira] [Closed] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-03-02 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30989.
--
Resolution: Fixed

Fixed in

1.18.0: b4d43b47c993b7b4d5e4f7a78610c54124fcbcb4

1.17.0: 333088113993f4607038dae391863b5c30d0bc95

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.18.0
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** 

[jira] [Updated] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-03-02 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-30989:
---
Fix Version/s: 1.17.0
   1.18.0

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.18.0
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
>  * 

[jira] [Assigned] (FLINK-31239) Fix sum function can't get the corrected value when the argument type is string

2023-03-02 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-31239:
--

Assignee: dalongliu

> Fix sum function can't get the corrected value when the argument type is 
> string
> ---
>
> Key: FLINK-31239
> URL: https://issues.apache.org/jira/browse/FLINK-31239
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31082) Setting maven property 'flink.resueForks' to false in table planner module

2023-02-28 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-31082.
--
Resolution: Fixed

> Setting maven property 'flink.resueForks' to false in table planner module 
> ---
>
> Key: FLINK-31082
> URL: https://issues.apache.org/jira/browse/FLINK-31082
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> This issue is created to alleviate the OOM problem mentioned in issue: 
> https://issues.apache.org/jira/browse/FLINK-18356
> Setting maven property 'flink.resueForks' to false in table planner module 
> can only reduce the frequency of oom, but can't solve this problem. To 
> completely solve this problem, we need to identify the specific reasons, but 
> this is a time-consuming work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31082) Setting maven property 'flink.resueForks' to false in table planner module

2023-02-28 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694800#comment-17694800
 ] 

Godfrey He commented on FLINK-31082:


Fixed in 1.16.2: 247a099cc358e0006aa0e387a55cf6d547814f98

> Setting maven property 'flink.resueForks' to false in table planner module 
> ---
>
> Key: FLINK-31082
> URL: https://issues.apache.org/jira/browse/FLINK-31082
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> This issue is created to alleviate the OOM problem mentioned in issue: 
> https://issues.apache.org/jira/browse/FLINK-18356
> Setting maven property 'flink.resueForks' to false in table planner module 
> can only reduce the frequency of oom, but can't solve this problem. To 
> completely solve this problem, we need to identify the specific reasons, but 
> this is a time-consuming work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31059) Release Testing: Verify FLINK-29717 Supports hive udaf such as sum/count by native implementation

2023-02-20 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-31059:
--

Assignee: miamiaoxyz  (was: Yunhong Zheng)

> Release Testing: Verify FLINK-29717 Supports hive udaf such as sum/count by 
> native implementation
> -
>
> Key: FLINK-31059
> URL: https://issues.apache.org/jira/browse/FLINK-31059
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: dalongliu
>Assignee: miamiaoxyz
>Priority: Blocker
> Fix For: 1.17.0
>
>
> This task aims to verify 
> [FLINK-29717|https://issues.apache.org/jira/browse/FLINK-29717] which 
> improves the hive udaf performance.
> As the document [PR|https://github.com/apache/flink/pull/21789] description, 
> please veriy:
> 1. Enabling the option `table.exec.hive.native-agg-function.enabled`, use the 
> sum/count/avg/min/max functions separately in the query to verify if the 
> hash-agg strategy is chosen via plan, and verify if the data results are the 
> same as when the option `table.exec.hive.native-agg-function.enabled` is 
> disabled.
> 2. Enabling the option `table.exec.hive.native-agg-function.enabled`, combine 
> sum/count/avg/min/max functions in query, verify if the hash-agg strategy is 
> chosen via plan, and verify if the data results are the same as when option 
> `table.exec.hive.native-agg-function.enabled` is disabled.
> 3. Enabling the option `table.exec.hive.native-agg-function.enabled`, count 
> or max array and other complex types in query, verify whether the 
> sort-agg strategy is chosen via plan, verify whether the data result is the 
> same as when option `table.exec.hive.native-agg-function.enabled` is disabled.
> 4. Enabling the option `table.exec.hive.native-agg-function.enabled`, use the 
> sum/count and first_value/last_value functions in the query simultaneously, 
> verify that the sort-agg strategy is chosen via plan, verify that the data is 
> the same as when option `table.exec.hive.native-agg-function.enabled` is 
> disabled.
> 5. Enabling the option `table.exec.hive.native-agg-function.enabled`, use the 
> sum/count/avg/min/max functions in the query and open sort-agg strategy 
> forcibly, verify that the data results are the same as when option 
> `table.exec.hive.native-agg-function.enabled` is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31079) Release Testing: Verify FLINK-29663 Further improvements of adaptive batch scheduler

2023-02-20 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-31079:
--

Assignee: miamiaoxyz  (was: Yunhong Zheng)

> Release Testing: Verify FLINK-29663 Further improvements of adaptive batch 
> scheduler
> 
>
> Key: FLINK-31079
> URL: https://issues.apache.org/jira/browse/FLINK-31079
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: miamiaoxyz
>Priority: Blocker
> Fix For: 1.17.0
>
>
> This task aims to verify FLINK-29663 which improves the adaptive batch 
> scheduler.
> Before the change of FLINK-29663, adaptive batch scheduler will distribute 
> subpartitoins according to the number of subpartitions, make different 
> downstream subtasks consume roughly the same number of subpartitions. This 
> will lead to imbalance loads of different downstream tasks when the 
> subpartitions contain different amounts of data.
> To solve this problem, in FLINK-29663, we let the adaptive batch scheduler 
> distribute subpartitoins according to the amount of data, so that different 
> downstream subtasks consume roughly the same amount of data. Note that 
> currently it only takes effect for All-To-All edges.
> The documentation of adaptive scheduler can be found 
> [here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler]
> One can verify it by creating intended data skew on All-To-All edges.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30824) Add document for new introduced hive agg option

2023-02-20 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30824.
--
Resolution: Fixed

Fixed in 1.17.0: afdc079465c393d98bf2b3607a75b1fc9d58d281

> Add document for new introduced hive agg option
> ---
>
> Key: FLINK-30824
> URL: https://issues.apache.org/jira/browse/FLINK-30824
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30824) Add document for new introduced hive agg option

2023-02-20 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30824:
--

Assignee: dalongliu

> Add document for new introduced hive agg option
> ---
>
> Key: FLINK-30824
> URL: https://issues.apache.org/jira/browse/FLINK-30824
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'

2023-02-16 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30971.
--
Resolution: Fixed

Fixed in master: 55b927b0e6eb2d5d71487b9bb2c4dab80017a7e5

> Modify the default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'
> -
>
> Key: FLINK-30971
> URL: https://issues.apache.org/jira/browse/FLINK-30971
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> In our test environment, we set the default parallelism to  1 and got the 
> most appropriate default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'  is 500. However, 
> for these batch jobs with high parallelism in produce environment,  the 
> amount of data in single parallelism is almost less than 500. Therefore, 
> after testing, we found that set to 50 can get better results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'

2023-02-16 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-30971:
---
Component/s: Table SQL / Planner
 (was: Table SQL / Runtime)

> Modify the default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'
> -
>
> Key: FLINK-30971
> URL: https://issues.apache.org/jira/browse/FLINK-30971
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> In our test environment, we set the default parallelism to  1 and got the 
> most appropriate default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'  is 500. However, 
> for these batch jobs with high parallelism in produce environment,  the 
> amount of data in single parallelism is almost less than 500. Therefore, 
> after testing, we found that set to 50 can get better results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30959) UNIX_TIMESTAMP's return value does not meet expected

2023-02-08 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685777#comment-17685777
 ] 

Godfrey He commented on FLINK-30959:


Thanks for reporting this issue. [~yunfengzhou] 

 

Currently, the behavior of data with timezone is not defined, the description 
of \{{UNIX_TIMESTAMP(string1[, string2])}} in Flink document is C{{{}onverts 
date time string string1 in format string2 (by default: -MM-dd HH:mm:ss if 
not specified) to Unix timestamp (in seconds), using the specified timezone in 
table config. {}}}which means we should always use the specified timezone in 
table. config to parse data. I think the behavior for \{{-MM-dd 
HH:mm:ss.SSS X}} is not considered before, and I tend to use the time zone in 
the record if the format and the record both have time zone. If that, the 
result is correct.

need more discussion determine the behavior.  cc [~Leonard] [~jark] [~twalthr] 

 

> UNIX_TIMESTAMP's return value does not meet expected
> 
>
> Key: FLINK-30959
> URL: https://issues.apache.org/jira/browse/FLINK-30959
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.2
>Reporter: Yunfeng Zhou
>Priority: Major
>
> When running the following pyflink program
>  
> {code:python}
> import pandas as pd
> from pyflink.datastream import StreamExecutionEnvironment, HashMapStateBackend
> from pyflink.table import StreamTableEnvironment
> if __name__ == "__main__":
> input_data = pd.DataFrame(
> [
> ["Alex", 100.0, "2022-01-01 08:00:00.001 +0800"],
> ["Emma", 400.0, "2022-01-01 00:00:00.003 +"],
> ["Alex", 200.0, "2022-01-01 08:00:00.005 +0800"],
> ["Emma", 300.0, "2022-01-01 00:00:00.007 +"],
> ["Jack", 500.0, "2022-01-01 08:00:00.009 +0800"],
> ["Alex", 450.0, "2022-01-01 00:00:00.011 +"],
> ],
> columns=["name", "avg_cost", "time"],
> )
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_state_backend(HashMapStateBackend())
> t_env = StreamTableEnvironment.create(env)
> input_table = t_env.from_pandas(input_data)
> t_env.create_temporary_view("input_table", input_table)
> time_format = "-MM-dd HH:mm:ss.SSS X"
> output_table = t_env.sql_query(
> f"SELECT *, UNIX_TIMESTAMP(`time`, '{time_format}') AS unix_time FROM 
> input_table"
> )
> output_table.execute().print()
> {code}
> The actual output is 
> {code}
> +++++--+
> | op |   name |   avg_cost |  
>  time |unix_time |
> +++++--+
> | +I |   Alex |  100.0 |  
> 2022-01-01 08:00:00.001 +0800 |   1640995200 |
> | +I |   Emma |  400.0 |  
> 2022-01-01 00:00:00.003 + |   1640995200 |
> | +I |   Alex |  200.0 |  
> 2022-01-01 08:00:00.005 +0800 |   1640995200 |
> | +I |   Emma |  300.0 |  
> 2022-01-01 00:00:00.007 + |   1640995200 |
> | +I |   Jack |  500.0 |  
> 2022-01-01 08:00:00.009 +0800 |   1640995200 |
> | +I |   Alex |  450.0 |  
> 2022-01-01 00:00:00.011 + |   1640995200 |
> +++++--+
> {code}
> While the expected result is
> {code:java}
> +++++--+
> | op |   name |   avg_cost |  
>  time |unix_time |
> +++++--+
> | +I |   Alex |  100.0 |  
> 2022-01-01 08:00:00.001 +0800 |   1640995200 |
> | +I |   Emma |  400.0 |  
> 2022-01-01 00:00:00.003 + |   1640966400 |
> | +I |   Alex |  200.0 |  
> 2022-01-01 08:00:00.005 +0800 |   1640995200 |
> | +I |   Emma |  300.0 |  
> 2022-01-01 

[jira] [Closed] (FLINK-30876) Fix ResetTransformationProcessor don't reset the transformation of ExecNode in BatchExecMultiInput.rootNode

2023-02-05 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30876.
--
Resolution: Fixed

> Fix ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode
> ---
>
> Key: FLINK-30876
> URL: https://issues.apache.org/jira/browse/FLINK-30876
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> Now, ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode. This may cause error while creating 
> StreamGraph for BatchExecMultiInput due to different id of rootNode and 
> inputNode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30876) Fix ResetTransformationProcessor don't reset the transformation of ExecNode in BatchExecMultiInput.rootNode

2023-02-05 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684394#comment-17684394
 ] 

Godfrey He edited comment on FLINK-30876 at 2/6/23 3:23 AM:


Fixed in master: 111342f37bdc0d582d3f7af458d9869f0548299f

1.16.2: 00a4ab9011cf13facde364c52a170a7a897cdcce


was (Author: godfrey):
Fixed in master: 111342f37bdc0d582d3f7af458d9869f0548299f

> Fix ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode
> ---
>
> Key: FLINK-30876
> URL: https://issues.apache.org/jira/browse/FLINK-30876
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> Now, ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode. This may cause error while creating 
> StreamGraph for BatchExecMultiInput due to different id of rootNode and 
> inputNode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30876) Fix ResetTransformationProcessor don't reset the transformation of ExecNode in BatchExecMultiInput.rootNode

2023-02-05 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30876:
--

Assignee: Yunhong Zheng

> Fix ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode
> ---
>
> Key: FLINK-30876
> URL: https://issues.apache.org/jira/browse/FLINK-30876
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> Now, ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode. This may cause error while creating 
> StreamGraph for BatchExecMultiInput due to different id of rootNode and 
> inputNode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30876) Fix ResetTransformationProcessor don't reset the transformation of ExecNode in BatchExecMultiInput.rootNode

2023-02-05 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684394#comment-17684394
 ] 

Godfrey He commented on FLINK-30876:


Fixed in master: 111342f37bdc0d582d3f7af458d9869f0548299f

> Fix ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode
> ---
>
> Key: FLINK-30876
> URL: https://issues.apache.org/jira/browse/FLINK-30876
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> Now, ResetTransformationProcessor don't reset the transformation of ExecNode 
> in BatchExecMultiInput.rootNode. This may cause error while creating 
> StreamGraph for BatchExecMultiInput due to different id of rootNode and 
> inputNode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30542) Support adaptive local hash aggregate in runtime

2023-01-31 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30542.
--
Resolution: Fixed

Fixed in 1.17.0: 122ba8f319b0d68374abba08d676e6dfa82cc114

> Support adaptive local hash aggregate in runtime
> 
>
> Key: FLINK-30542
> URL: https://issues.apache.org/jira/browse/FLINK-30542
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Introduce a new strategy to adaptively determine whether local hash aggregate 
> is required according to the aggregation degree of local hash aggregate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30542) Support adaptive local hash aggregate in runtime

2023-01-31 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-30542:
---
Component/s: Table SQL / Planner

> Support adaptive local hash aggregate in runtime
> 
>
> Key: FLINK-30542
> URL: https://issues.apache.org/jira/browse/FLINK-30542
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Introduce a new strategy to adaptively determine whether local hash aggregate 
> is required according to the aggregation degree of local hash aggregate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30727) JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException

2023-01-30 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682344#comment-17682344
 ] 

Godfrey He commented on FLINK-30727:


Fixed in 1.17.0: dcbb20688c0de238f65a6986f9888c5c5088e34a which adds more 
network memory

We will continue to follow this issue

> JoinReorderITCase.testBushyTreeJoinReorder failed due to IOException
> 
>
> Key: FLINK-30727
> URL: https://issues.apache.org/jira/browse/FLINK-30727
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Yunhong Zheng
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> IOException due to timeout occurring while requesting exclusive NetworkBuffer 
> caused JoinReorderITCase.testBushyTreeJoinReorder to fail:
> {code}
> [...]
> Jan 18 01:11:27 Caused by: java.io.IOException: Timeout triggered when 
> requesting exclusive buffers: The total number of network buffers is 
> currently set to 2048 of 32768 bytes each. You can increase this number by 
> setting the configuration keys 'taskmanager.memory.network.fraction', 
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max',  or 
> you may increase the timeout which is 3ms by setting the key 
> 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalRequestMemorySegments(NetworkBufferPool.java:256)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestPooledMemorySegmentsBlocking(NetworkBufferPool.java:179)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.reserveSegments(LocalBufferPool.java:262)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setupChannels(SingleInputGate.java:517)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:277)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:962)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:648)
> Jan 18 01:11:27   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:556)
> Jan 18 01:11:27   at java.lang.Thread.run(Thread.java:748)
> {code}
> Same build, 2 failures:
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=14300
> * 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44987=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=14362



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29720) Supports hive average function by native implemetatoin

2023-01-30 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-29720:
--

Assignee: dalongliu

> Supports hive average function by native implemetatoin
> --
>
> Key: FLINK-29720
> URL: https://issues.apache.org/jira/browse/FLINK-29720
> Project: Flink
>  Issue Type: Sub-task
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29720) Supports hive average function by native implemetatoin

2023-01-30 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-29720:
---
Component/s: Connectors / Hive

> Supports hive average function by native implemetatoin
> --
>
> Key: FLINK-29720
> URL: https://issues.apache.org/jira/browse/FLINK-29720
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29720) Supports hive average function by native implemetatoin

2023-01-30 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-29720.
--
Resolution: Fixed

Fixed in 1.17.0: ec3243e36eebb706c624068c8a7622cf308df95e

> Supports hive average function by native implemetatoin
> --
>
> Key: FLINK-29720
> URL: https://issues.apache.org/jira/browse/FLINK-29720
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30814) The parallelism of sort after a global partitioning is not forced to be 1

2023-01-29 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30814.
--
Resolution: Fixed

Fixed in 1.17.0: 916ff76b61a0bfa8283d52be53cd33e317d0d550

> The parallelism of sort after a global partitioning is not 
> forced to be 1
> 
>
> Key: FLINK-30814
> URL: https://issues.apache.org/jira/browse/FLINK-30814
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Godfrey He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> The parallelism of sort after a global partitioning is not 
> forced to be 1. The may lead to the parallelism to be changed by adaptive 
> batch scheduler, which is unexpected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30814) The parallelism of sort after a global partitioning is not forced to be 1

2023-01-28 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30814:
--

Assignee: Godfrey He

> The parallelism of sort after a global partitioning is not 
> forced to be 1
> 
>
> Key: FLINK-30814
> URL: https://issues.apache.org/jira/browse/FLINK-30814
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Assignee: Godfrey He
>Priority: Major
> Fix For: 1.17.0
>
>
> The parallelism of sort after a global partitioning is not 
> forced to be 1. The may lead to the parallelism to be changed by adaptive 
> batch scheduler, which is unexpected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30814) The parallelism of sort after a global partitioning is not forced to be 1

2023-01-28 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17681693#comment-17681693
 ] 

Godfrey He commented on FLINK-30814:


[~zhuzh] thanks for reporting this issue.

Currently, the parallelism of global sort is set as 1, but the max parallelism 
is not set because ExecNodeBase#inputsContainSingleton does not work for global 
sort.  This works fine for the default scheduler, but may occur wrong result if 
 adaptive batch scheduler changes the global sort parallelism.

 

I will fix it.

 

> The parallelism of sort after a global partitioning is not 
> forced to be 1
> 
>
> Key: FLINK-30814
> URL: https://issues.apache.org/jira/browse/FLINK-30814
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Zhu Zhu
>Priority: Major
> Fix For: 1.17.0
>
>
> The parallelism of sort after a global partitioning is not 
> forced to be 1. The may lead to the parallelism to be changed by adaptive 
> batch scheduler, which is unexpected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29722) Supports hive max function by native implementation

2023-01-18 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-29722:
--

Assignee: dalongliu

> Supports hive max function by native implementation
> ---
>
> Key: FLINK-29722
> URL: https://issues.apache.org/jira/browse/FLINK-29722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29722) Supports hive max function by native implementation

2023-01-18 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-29722.
--
Resolution: Fixed

Fixed in 1.17.0: 74c7188ae9898b492c94a472d9d407bf4f8e0876

> Supports hive max function by native implementation
> ---
>
> Key: FLINK-29722
> URL: https://issues.apache.org/jira/browse/FLINK-29722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29722) Supports hive max function by native implementation

2023-01-18 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-29722:
---
Component/s: Connectors / Hive

> Supports hive max function by native implementation
> ---
>
> Key: FLINK-29722
> URL: https://issues.apache.org/jira/browse/FLINK-29722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29719) Supports hive count function by native implementation

2023-01-18 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-29719:
--

Assignee: dalongliu

> Supports hive count function by native implementation
> -
>
> Key: FLINK-29719
> URL: https://issues.apache.org/jira/browse/FLINK-29719
> Project: Flink
>  Issue Type: Sub-task
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29719) Supports hive count function by native implementation

2023-01-18 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-29719:
---
Component/s: Connectors / Hive

> Supports hive count function by native implementation
> -
>
> Key: FLINK-29719
> URL: https://issues.apache.org/jira/browse/FLINK-29719
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29719) Supports hive count function by native implementation

2023-01-18 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-29719.
--
Resolution: Fixed

Fixed in 1.17.0: 606f297198acd74a5c1a39700bd84ad9e26e7b82

> Supports hive count function by native implementation
> -
>
> Key: FLINK-29719
> URL: https://issues.apache.org/jira/browse/FLINK-29719
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29721) Supports hive min function by native implementation

2023-01-17 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-29721:
---
Component/s: Connectors / Hive

> Supports hive min function by native implementation
> ---
>
> Key: FLINK-29721
> URL: https://issues.apache.org/jira/browse/FLINK-29721
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29721) Supports hive min function by native implementation

2023-01-17 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-29721:
--

Assignee: tartarus  (was: KH)

> Supports hive min function by native implementation
> ---
>
> Key: FLINK-29721
> URL: https://issues.apache.org/jira/browse/FLINK-29721
> Project: Flink
>  Issue Type: Sub-task
>Reporter: dalongliu
>Assignee: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29721) Supports hive min function by native implementation

2023-01-17 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-29721.
--
Resolution: Fixed

Fixed in 1.17.0: 8a1f66ca827163b32387e0043f4362921f6c11a9

> Supports hive min function by native implementation
> ---
>
> Key: FLINK-29721
> URL: https://issues.apache.org/jira/browse/FLINK-29721
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: dalongliu
>Assignee: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29721) Supports hive min function by native implementation

2023-01-17 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-29721:
--

Assignee: KH

> Supports hive min function by native implementation
> ---
>
> Key: FLINK-29721
> URL: https://issues.apache.org/jira/browse/FLINK-29721
> Project: Flink
>  Issue Type: Sub-task
>Reporter: dalongliu
>Assignee: KH
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30376) Introduce a new flink bushy join reorder rule which based on greedy algorithm

2023-01-17 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30376:
--

Assignee: Yunhong Zheng

> Introduce a new flink bushy join reorder rule which based on greedy algorithm
> -
>
> Key: FLINK-30376
> URL: https://issues.apache.org/jira/browse/FLINK-30376
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Introducing a new Flink bushy join reorder strategy which based on the greedy 
> algorithm. The old join reorder rule will also be the default join reorder 
> rule and the new bushy join reorder strategy will be optional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30376) Introduce a new flink bushy join reorder rule which based on greedy algorithm

2023-01-17 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30376.
--
Resolution: Fixed

Fixed in 1.17.0: 14ea1a58e9bf516f0ed0683486252c7e54ea8039

> Introduce a new flink bushy join reorder rule which based on greedy algorithm
> -
>
> Key: FLINK-30376
> URL: https://issues.apache.org/jira/browse/FLINK-30376
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Introducing a new Flink bushy join reorder strategy which based on the greedy 
> algorithm. The old join reorder rule will also be the default join reorder 
> rule and the new bushy join reorder strategy will be optional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30586) Fix calcCodeGen failed if calc with like condition contains double quotation mark

2023-01-10 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17656440#comment-17656440
 ] 

Godfrey He edited comment on FLINK-30586 at 1/10/23 8:17 AM:
-

Fixed in 1.17.0: 68b37fb867374df5a201f0b170e35c21266e5d7b

1.16.1: 54518e9e27c8cc17f27b5d9a4de48e71cd817e42


was (Author: godfrey):
Fixed in 1.17.0: 68b37fb867374df5a201f0b170e35c21266e5d7b

> Fix calcCodeGen failed if calc with like condition contains double quotation 
> mark
> -
>
> Key: FLINK-30586
> URL: https://issues.apache.org/jira/browse/FLINK-30586
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: code-gen-1.png, code-gen-2.png
>
>
> If I write a sql like "SELECT * FROM MyTable WHERE b LIKE '%"%'" in 
> Flink-1.16 as
> 'like' condition contains double quotation mark, it will cause code gen 
> failed because wrong code generated by codeGen. 
> !code-gen-1.png!
>  
> !code-gen-2.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30586) Fix calcCodeGen failed if calc with like condition contains double quotation mark

2023-01-09 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30586.
--
Resolution: Fixed

Fixed in 1.17.0: 68b37fb867374df5a201f0b170e35c21266e5d7b

> Fix calcCodeGen failed if calc with like condition contains double quotation 
> mark
> -
>
> Key: FLINK-30586
> URL: https://issues.apache.org/jira/browse/FLINK-30586
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: code-gen-1.png, code-gen-2.png
>
>
> If I write a sql like "SELECT * FROM MyTable WHERE b LIKE '%"%'" in 
> Flink-1.16 as
> 'like' condition contains double quotation mark, it will cause code gen 
> failed because wrong code generated by codeGen. 
> !code-gen-1.png!
>  
> !code-gen-2.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30586) Fix calcCodeGen failed if calc with like condition contains double quotation mark

2023-01-09 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30586:
--

Assignee: Yunhong Zheng

> Fix calcCodeGen failed if calc with like condition contains double quotation 
> mark
> -
>
> Key: FLINK-30586
> URL: https://issues.apache.org/jira/browse/FLINK-30586
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: code-gen-1.png, code-gen-2.png
>
>
> If I write a sql like "SELECT * FROM MyTable WHERE b LIKE '%"%'" in 
> Flink-1.16 as
> 'like' condition contains double quotation mark, it will cause code gen 
> failed because wrong code generated by codeGen. 
> !code-gen-1.png!
>  
> !code-gen-2.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30491) Hive table partition supports to deserialize later during runtime

2023-01-09 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30491.
--
Resolution: Fixed

Fixed in 1.17.0: d4340d16708010394d7c57063b5dece8362d41d0

> Hive table partition supports to deserialize later during runtime
> -
>
> Key: FLINK-30491
> URL: https://issues.apache.org/jira/browse/FLINK-30491
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30542) Support adaptive local hash aggregate in runtime

2023-01-08 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-30542:
---
Summary: Support adaptive local hash aggregate in runtime  (was: Support 
adaptive hash aggregate in runtime)

> Support adaptive local hash aggregate in runtime
> 
>
> Key: FLINK-30542
> URL: https://issues.apache.org/jira/browse/FLINK-30542
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
> Fix For: 1.17.0
>
>
> Introduce a new strategy to adaptively determine whether local hash aggregate 
> is required according to the aggregation degree of local hash aggregate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30491) Hive table partition supports to deserialize later during runtime

2023-01-08 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30491:
--

Assignee: dalongliu

> Hive table partition supports to deserialize later during runtime
> -
>
> Key: FLINK-30491
> URL: https://issues.apache.org/jira/browse/FLINK-30491
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30542) Support adaptive local hash aggregate in runtime

2023-01-08 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30542:
--

Assignee: Yunhong Zheng

> Support adaptive local hash aggregate in runtime
> 
>
> Key: FLINK-30542
> URL: https://issues.apache.org/jira/browse/FLINK-30542
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
> Fix For: 1.17.0
>
>
> Introduce a new strategy to adaptively determine whether local hash aggregate 
> is required according to the aggregation degree of local hash aggregate.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30365) New dynamic partition pruning strategy to support more dpp patterns

2023-01-08 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30365.
--
Resolution: Fixed

Fixed in 1.17.0: d9f9b55f82dfbc1676572cc36b718a99001497f8

> New dynamic partition pruning strategy to support more dpp patterns
> ---
>
> Key: FLINK-30365
> URL: https://issues.apache.org/jira/browse/FLINK-30365
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> New dynamic partition pruning strategy to support more dpp patterns. Now, dpp 
> rules is coupled with the join reorder rules, which will affect the result of 
> join reorder. At the same time, the dpp rule don't support these patterns 
> like union node in fact side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30365) New dynamic partition pruning strategy to support more dpp patterns

2023-01-08 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30365:
--

Assignee: Yunhong Zheng

> New dynamic partition pruning strategy to support more dpp patterns
> ---
>
> Key: FLINK-30365
> URL: https://issues.apache.org/jira/browse/FLINK-30365
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> New dynamic partition pruning strategy to support more dpp patterns. Now, dpp 
> rules is coupled with the join reorder rules, which will affect the result of 
> join reorder. At the same time, the dpp rule don't support these patterns 
> like union node in fact side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29849) Event time temporal join on an upsert source may produce incorrect execution plan

2023-01-03 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650120#comment-17650120
 ] 

Godfrey He edited comment on FLINK-29849 at 1/4/23 2:07 AM:


Fixed in master: eb44ac01c9969cb22ab832b6b2155b109f015b06

1.16.1: dbb6654c9d211e0944a0a1f58921c12bda6916cf


was (Author: godfrey):
Fixed in master: eb44ac01c9969cb22ab832b6b2155b109f015b06

> Event time temporal join on an upsert source may produce incorrect execution 
> plan
> -
>
> Key: FLINK-29849
> URL: https://issues.apache.org/jira/browse/FLINK-29849
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> For current implementation, the execution plan is incorrect when do event 
> time temporal join on an upsert source. There's two problems:
> 1.  for an upsert source, we should not add a ChangelogNormalize node under a 
> temporal join input, or it will damage the versions of the version table. For 
> versioned tables, we use a single-temporal mechanism which relies sequencial 
> records of a same key to ensure the valid period of each version, so if the 
> ChangelogNormalize was added then an UB message will be produced based on the 
> previous  UA or Insert message, and all the columns are totally same include 
> event time, e.g., 
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1  [~, '2022-11-02 10:00:00')
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into an event time temporal join, 
> otherwise, the filter may also corrupt the versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29849) Event time temporal join on an upsert source may produce incorrect execution plan

2023-01-03 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-29849:
---
Fix Version/s: 1.16.1

> Event time temporal join on an upsert source may produce incorrect execution 
> plan
> -
>
> Key: FLINK-29849
> URL: https://issues.apache.org/jira/browse/FLINK-29849
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> For current implementation, the execution plan is incorrect when do event 
> time temporal join on an upsert source. There's two problems:
> 1.  for an upsert source, we should not add a ChangelogNormalize node under a 
> temporal join input, or it will damage the versions of the version table. For 
> versioned tables, we use a single-temporal mechanism which relies sequencial 
> records of a same key to ensure the valid period of each version, so if the 
> ChangelogNormalize was added then an UB message will be produced based on the 
> previous  UA or Insert message, and all the columns are totally same include 
> event time, e.g., 
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1  [~, '2022-11-02 10:00:00')
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into an event time temporal join, 
> otherwise, the filter may also corrupt the versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28988) Incorrect result for filter after temporal join

2023-01-02 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17643318#comment-17643318
 ] 

Godfrey He edited comment on FLINK-28988 at 1/3/23 4:08 AM:


Fixed in 1.17.0:

b2203eaef68364306dfcc27fb34ac82baefda3d3

2851fac9c4c052876c80440b6b0b637603de06ea

 

1.16.1:

17b42516ceb73fa342101aedf830df40a84d82bc

c14355243995bff7b03a527ed073a2bbaab70ce8


was (Author: godfrey):
Fixed in 1.17.0:

b2203eaef68364306dfcc27fb34ac82baefda3d3

2851fac9c4c052876c80440b6b0b637603de06ea

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|     

[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join

2023-01-02 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-28988:
---
Fix Version/s: 1.16.1

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> 

[jira] [Closed] (FLINK-28850) Support table alias in LOOKUP hint

2022-12-28 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-28850.
--
Resolution: Fixed

Fixed in master:

be6b1c94ef3f552c753746863cb0a4e7dd86d2fc

f32ba645246a8180a02182650fb51392facfcc09

62a9d837e24150461790e7689659c50f15197ebc

> Support table alias in LOOKUP hint
> --
>
> Key: FLINK-28850
> URL: https://issues.apache.org/jira/browse/FLINK-28850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> currently calcite's LogicalSnapshot is not Hintable, to support table alias 
> in LOOKUP hint relies on it, so we need create a ticket in calcite community 
> and also do the change in Flink first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28850) Support table alias in LOOKUP hint

2022-12-28 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-28850:
---
Fix Version/s: 1.17.0

> Support table alias in LOOKUP hint
> --
>
> Key: FLINK-28850
> URL: https://issues.apache.org/jira/browse/FLINK-28850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> currently calcite's LogicalSnapshot is not Hintable, to support table alias 
> in LOOKUP hint relies on it, so we need create a ticket in calcite community 
> and also do the change in Flink first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28850) Support table alias in LOOKUP hint

2022-12-28 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-28850:
--

Assignee: lincoln lee

> Support table alias in LOOKUP hint
> --
>
> Key: FLINK-28850
> URL: https://issues.apache.org/jira/browse/FLINK-28850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> currently calcite's LogicalSnapshot is not Hintable, to support table alias 
> in LOOKUP hint relies on it, so we need create a ticket in calcite community 
> and also do the change in Flink first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30368) Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected

2022-12-27 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30368.
--
Resolution: Fixed

Fixed in master: dc862dae28a172f674a9b8a2198c603275304550

> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected 
> ---
>
> Key: FLINK-30368
> URL: https://issues.apache.org/jira/browse/FLINK-30368
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected. This wrong zero value 
> will affect the selection of join type。 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30368) Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected

2022-12-27 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He reassigned FLINK-30368:
--

Assignee: Yunhong Zheng

> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected 
> ---
>
> Key: FLINK-30368
> URL: https://issues.apache.org/jira/browse/FLINK-30368
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the 
> method input domainSize much larger than numSelected. This wrong zero value 
> will affect the selection of join type。 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30270) FlinkJoinToMultiJoinRule incorrectly combines Left/Right outer join to MultiJoin

2022-12-26 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-30270.
--
Resolution: Fixed

Fixed in master:7590cb7d84774b0e8afd7b0af31cc0af762d4c6a

1.16.1: ca42695dba5ab72c6b9b895bb6553321c30d5074

> FlinkJoinToMultiJoinRule incorrectly combines Left/Right outer join to 
> MultiJoin
> 
>
> Key: FLINK-30270
> URL: https://issues.apache.org/jira/browse/FLINK-30270
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> FlinkJoinToMultiJoinRule incorrectly combines Left/Right outer join to 
> MultiJoin.  In some complex cases, it need to consider join conditions, 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29849) Event time temporal join on an upsert source may produce incorrect execution plan

2022-12-20 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-29849.
--
Resolution: Fixed

Fixed in master: eb44ac01c9969cb22ab832b6b2155b109f015b06

> Event time temporal join on an upsert source may produce incorrect execution 
> plan
> -
>
> Key: FLINK-29849
> URL: https://issues.apache.org/jira/browse/FLINK-29849
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> For current implementation, the execution plan is incorrect when do event 
> time temporal join on an upsert source. There's two problems:
> 1.  for an upsert source, we should not add a ChangelogNormalize node under a 
> temporal join input, or it will damage the versions of the version table. For 
> versioned tables, we use a single-temporal mechanism which relies sequencial 
> records of a same key to ensure the valid period of each version, so if the 
> ChangelogNormalize was added then an UB message will be produced based on the 
> previous  UA or Insert message, and all the columns are totally same include 
> event time, e.g., 
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1  [~, '2022-11-02 10:00:00')
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into an event time temporal join, 
> otherwise, the filter may also corrupt the versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18356) flink-table-planner Exit code 137 returned from process

2022-12-13 Thread Godfrey He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646524#comment-17646524
 ] 

Godfrey He commented on FLINK-18356:


[~zhengyunhong97] will help to address this issue.

> flink-table-planner Exit code 137 returned from process
> ---
>
> Key: FLINK-18356
> URL: https://issues.apache.org/jira/browse/FLINK-18356
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Tests
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0
>Reporter: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: 1234.jpg, app-profiling_4.gif
>
>
> {noformat}
> = test session starts 
> ==
> platform linux -- Python 3.7.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1
> cachedir: .tox/py37-cython/.pytest_cache
> rootdir: /__w/3/s/flink-python
> collected 568 items
> pyflink/common/tests/test_configuration.py ..[  
> 1%]
> pyflink/common/tests/test_execution_config.py ...[  
> 5%]
> pyflink/dataset/tests/test_execution_environment.py .
> ##[error]Exit code 137 returned from process: file name '/bin/docker', 
> arguments 'exec -i -u 1002 
> 97fc4e22522d2ced1f4d23096b8929045d083dd0a99a4233a8b20d0489e9bddb 
> /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'.
> Finishing: Test - python
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3729=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join

2022-12-05 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He updated FLINK-28988:
---
Fix Version/s: 1.17.0

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> online|1970-01-01 

[jira] [Closed] (FLINK-28988) Incorrect result for filter after temporal join

2022-12-05 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-28988.
--
Resolution: Fixed

Fixed in 1.17.0:

b2203eaef68364306dfcc27fb34ac82baefda3d3

2851fac9c4c052876c80440b6b0b637603de06ea

> Incorrect result for filter after temporal join
> ---
>
> Key: FLINK-28988
> URL: https://issues.apache.org/jira/browse/FLINK-28988
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Shuiqiang Chen
>Priority: Major
>  Labels: pull-request-available
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
> public static void main(String[] args) throws Exception {
> // set up the Java DataStream API
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // set up the Java Table API
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> final DataStreamSource> ds =
> env.fromElements(
> new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
> new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
> new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
> final Table table =
> tableEnv.fromDataStream(
> ds,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", DataTypes.STRING())
> .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "state", "ts");
> tableEnv.createTemporaryView("source_table", table);
> final Table dedupeTable =
> tableEnv.sqlQuery(
> "SELECT * FROM ("
> + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
> + ") WHERE row_num = 1");
> tableEnv.createTemporaryView("versioned_table", dedupeTable);
> DataStreamSource> event =
> env.fromElements(
> new Tuple2<>(0, Instant.ofEpochMilli(0)),
> new Tuple2<>(0, Instant.ofEpochMilli(5)),
> new Tuple2<>(0, Instant.ofEpochMilli(10)),
> new Tuple2<>(0, Instant.ofEpochMilli(15)),
> new Tuple2<>(0, Instant.ofEpochMilli(20)),
> new Tuple2<>(0, Instant.ofEpochMilli(25)));
> final Table eventTable =
> tableEnv.fromDataStream(
> event,
> Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
> .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
> .build())
> .as("id", "ts");
> tableEnv.createTemporaryView("event_table", eventTable);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM event_table"
> + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
> + " ON event_table.id = versioned_table.id");
> result.execute().print();
> result.filter($("state").isEqual("online")).execute().print();
> }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 

[jira] [Closed] (FLINK-29781) ChangelogNormalize uses wrong keys after transformation by WatermarkAssignerChangelogNormalizeTransposeRule

2022-11-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-29781.
--
Resolution: Fixed

> ChangelogNormalize uses wrong keys after transformation by 
> WatermarkAssignerChangelogNormalizeTransposeRule 
> 
>
> Key: FLINK-29781
> URL: https://issues.apache.org/jira/browse/FLINK-29781
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> currently WatermarkAssignerChangelogNormalizeTransposeRule didn't remap the 
> uniquekey indexes for its new input after plan rewrite, this may produce 
> wrong result.
> A simple case:
> {code}
>   @Test
>   def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
> util.addTable("""
> |CREATE TABLE t1 (
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  b VARCHAR NOT NULL,
> |  WATERMARK FOR ingestion_time AS ingestion_time
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)'
> |)
>   """.stripMargin)
> util.addTable("""
> |CREATE TABLE t2 (
> |  k VARBINARY,
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  f BOOLEAN NOT NULL,
> |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,
> |  PRIMARY KEY (`a`) NOT ENFORCED
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)',
> | 'changelog-mode' = 'I,UA,D'
> |)
>   """.stripMargin)
> val sql =
>   """
> |SELECT t1.a, t1.b, t2.f
> |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
> | ON t1.a = t2.a WHERE t2.f = true
> |""".stripMargin
> util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
>   }
> {code}
> the generated plan is incorrect for now:  
> {color:red}ChangelogNormalize(key=[ingestion_time]){color} uses wrong key 
> 'ingestion_time' (should be 'a')
> optimize result: 
> {code}
> Calc(select=[a, b, f])
> +- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
> __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, 
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), 
> __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, 
> ingestion_time0, a0, f])
>:- Exchange(distribution=[hash[a]])
>:  +- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>: +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
> ingestion_time, a, b])
>:+- TableSourceScan(table=[[default_catalog, default_database, 
> t1]], fields=[a, b, ingestion_time])
>+- Exchange(distribution=[hash[a]])
>   +- Calc(select=[ingestion_time, a, f], where=[f])
>  +- ChangelogNormalize(key=[ingestion_time])
> +- Exchange(distribution=[hash[a]])
>+- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>   +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) 
> *ROWTIME*) AS ingestion_time, a, f])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
> ingestion_time])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29781) ChangelogNormalize uses wrong keys after transformation by WatermarkAssignerChangelogNormalizeTransposeRule

2022-11-14 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-29781:
---
Fix Version/s: 1.16.1

> ChangelogNormalize uses wrong keys after transformation by 
> WatermarkAssignerChangelogNormalizeTransposeRule 
> 
>
> Key: FLINK-29781
> URL: https://issues.apache.org/jira/browse/FLINK-29781
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> currently WatermarkAssignerChangelogNormalizeTransposeRule didn't remap the 
> uniquekey indexes for its new input after plan rewrite, this may produce 
> wrong result.
> A simple case:
> {code}
>   @Test
>   def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
> util.addTable("""
> |CREATE TABLE t1 (
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  b VARCHAR NOT NULL,
> |  WATERMARK FOR ingestion_time AS ingestion_time
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)'
> |)
>   """.stripMargin)
> util.addTable("""
> |CREATE TABLE t2 (
> |  k VARBINARY,
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  f BOOLEAN NOT NULL,
> |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,
> |  PRIMARY KEY (`a`) NOT ENFORCED
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)',
> | 'changelog-mode' = 'I,UA,D'
> |)
>   """.stripMargin)
> val sql =
>   """
> |SELECT t1.a, t1.b, t2.f
> |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
> | ON t1.a = t2.a WHERE t2.f = true
> |""".stripMargin
> util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
>   }
> {code}
> the generated plan is incorrect for now:  
> {color:red}ChangelogNormalize(key=[ingestion_time]){color} uses wrong key 
> 'ingestion_time' (should be 'a')
> optimize result: 
> {code}
> Calc(select=[a, b, f])
> +- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
> __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, 
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), 
> __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, 
> ingestion_time0, a0, f])
>:- Exchange(distribution=[hash[a]])
>:  +- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>: +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
> ingestion_time, a, b])
>:+- TableSourceScan(table=[[default_catalog, default_database, 
> t1]], fields=[a, b, ingestion_time])
>+- Exchange(distribution=[hash[a]])
>   +- Calc(select=[ingestion_time, a, f], where=[f])
>  +- ChangelogNormalize(key=[ingestion_time])
> +- Exchange(distribution=[hash[a]])
>+- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>   +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) 
> *ROWTIME*) AS ingestion_time, a, f])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
> ingestion_time])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29781) ChangelogNormalize uses wrong keys after transformation by WatermarkAssignerChangelogNormalizeTransposeRule

2022-11-14 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633625#comment-17633625
 ] 

godfrey he edited comment on FLINK-29781 at 11/14/22 9:50 AM:
--

Fixed in master: 5463f244ec69f623d75c15374b55bb8695e92b3e

in 1.16.1: 5466716b20d5c720bf29dea560909e7055870555


was (Author: godfreyhe):
Fixed in master: 5463f244ec69f623d75c15374b55bb8695e92b3e

> ChangelogNormalize uses wrong keys after transformation by 
> WatermarkAssignerChangelogNormalizeTransposeRule 
> 
>
> Key: FLINK-29781
> URL: https://issues.apache.org/jira/browse/FLINK-29781
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> currently WatermarkAssignerChangelogNormalizeTransposeRule didn't remap the 
> uniquekey indexes for its new input after plan rewrite, this may produce 
> wrong result.
> A simple case:
> {code}
>   @Test
>   def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
> util.addTable("""
> |CREATE TABLE t1 (
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  b VARCHAR NOT NULL,
> |  WATERMARK FOR ingestion_time AS ingestion_time
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)'
> |)
>   """.stripMargin)
> util.addTable("""
> |CREATE TABLE t2 (
> |  k VARBINARY,
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  f BOOLEAN NOT NULL,
> |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,
> |  PRIMARY KEY (`a`) NOT ENFORCED
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)',
> | 'changelog-mode' = 'I,UA,D'
> |)
>   """.stripMargin)
> val sql =
>   """
> |SELECT t1.a, t1.b, t2.f
> |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
> | ON t1.a = t2.a WHERE t2.f = true
> |""".stripMargin
> util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
>   }
> {code}
> the generated plan is incorrect for now:  
> {color:red}ChangelogNormalize(key=[ingestion_time]){color} uses wrong key 
> 'ingestion_time' (should be 'a')
> optimize result: 
> {code}
> Calc(select=[a, b, f])
> +- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
> __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, 
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), 
> __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, 
> ingestion_time0, a0, f])
>:- Exchange(distribution=[hash[a]])
>:  +- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>: +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
> ingestion_time, a, b])
>:+- TableSourceScan(table=[[default_catalog, default_database, 
> t1]], fields=[a, b, ingestion_time])
>+- Exchange(distribution=[hash[a]])
>   +- Calc(select=[ingestion_time, a, f], where=[f])
>  +- ChangelogNormalize(key=[ingestion_time])
> +- Exchange(distribution=[hash[a]])
>+- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>   +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) 
> *ROWTIME*) AS ingestion_time, a, f])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
> ingestion_time])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29781) ChangelogNormalize uses wrong keys after transformation by WatermarkAssignerChangelogNormalizeTransposeRule

2022-11-14 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17633625#comment-17633625
 ] 

godfrey he commented on FLINK-29781:


Fixed in master: 5463f244ec69f623d75c15374b55bb8695e92b3e

> ChangelogNormalize uses wrong keys after transformation by 
> WatermarkAssignerChangelogNormalizeTransposeRule 
> 
>
> Key: FLINK-29781
> URL: https://issues.apache.org/jira/browse/FLINK-29781
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> currently WatermarkAssignerChangelogNormalizeTransposeRule didn't remap the 
> uniquekey indexes for its new input after plan rewrite, this may produce 
> wrong result.
> A simple case:
> {code}
>   @Test
>   def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
> util.addTable("""
> |CREATE TABLE t1 (
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  b VARCHAR NOT NULL,
> |  WATERMARK FOR ingestion_time AS ingestion_time
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)'
> |)
>   """.stripMargin)
> util.addTable("""
> |CREATE TABLE t2 (
> |  k VARBINARY,
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  f BOOLEAN NOT NULL,
> |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,
> |  PRIMARY KEY (`a`) NOT ENFORCED
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)',
> | 'changelog-mode' = 'I,UA,D'
> |)
>   """.stripMargin)
> val sql =
>   """
> |SELECT t1.a, t1.b, t2.f
> |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
> | ON t1.a = t2.a WHERE t2.f = true
> |""".stripMargin
> util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
>   }
> {code}
> the generated plan is incorrect for now:  
> {color:red}ChangelogNormalize(key=[ingestion_time]){color} uses wrong key 
> 'ingestion_time' (should be 'a')
> optimize result: 
> {code}
> Calc(select=[a, b, f])
> +- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
> __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, 
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), 
> __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, 
> ingestion_time0, a0, f])
>:- Exchange(distribution=[hash[a]])
>:  +- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>: +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
> ingestion_time, a, b])
>:+- TableSourceScan(table=[[default_catalog, default_database, 
> t1]], fields=[a, b, ingestion_time])
>+- Exchange(distribution=[hash[a]])
>   +- Calc(select=[ingestion_time, a, f], where=[f])
>  +- ChangelogNormalize(key=[ingestion_time])
> +- Exchange(distribution=[hash[a]])
>+- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>   +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) 
> *ROWTIME*) AS ingestion_time, a, f])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
> ingestion_time])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29781) ChangelogNormalize uses wrong keys after transformation by WatermarkAssignerChangelogNormalizeTransposeRule

2022-11-14 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-29781:
--

Assignee: lincoln lee

> ChangelogNormalize uses wrong keys after transformation by 
> WatermarkAssignerChangelogNormalizeTransposeRule 
> 
>
> Key: FLINK-29781
> URL: https://issues.apache.org/jira/browse/FLINK-29781
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> currently WatermarkAssignerChangelogNormalizeTransposeRule didn't remap the 
> uniquekey indexes for its new input after plan rewrite, this may produce 
> wrong result.
> A simple case:
> {code}
>   @Test
>   def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
> util.addTable("""
> |CREATE TABLE t1 (
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  b VARCHAR NOT NULL,
> |  WATERMARK FOR ingestion_time AS ingestion_time
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)'
> |)
>   """.stripMargin)
> util.addTable("""
> |CREATE TABLE t2 (
> |  k VARBINARY,
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  f BOOLEAN NOT NULL,
> |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,
> |  PRIMARY KEY (`a`) NOT ENFORCED
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)',
> | 'changelog-mode' = 'I,UA,D'
> |)
>   """.stripMargin)
> val sql =
>   """
> |SELECT t1.a, t1.b, t2.f
> |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
> | ON t1.a = t2.a WHERE t2.f = true
> |""".stripMargin
> util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
>   }
> {code}
> the generated plan is incorrect for now:  
> {color:red}ChangelogNormalize(key=[ingestion_time]){color} uses wrong key 
> 'ingestion_time' (should be 'a')
> optimize result: 
> {code}
> Calc(select=[a, b, f])
> +- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
> __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, 
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), 
> __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, 
> ingestion_time0, a0, f])
>:- Exchange(distribution=[hash[a]])
>:  +- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>: +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
> ingestion_time, a, b])
>:+- TableSourceScan(table=[[default_catalog, default_database, 
> t1]], fields=[a, b, ingestion_time])
>+- Exchange(distribution=[hash[a]])
>   +- Calc(select=[ingestion_time, a, f], where=[f])
>  +- ChangelogNormalize(key=[ingestion_time])
> +- Exchange(distribution=[hash[a]])
>+- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>   +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) 
> *ROWTIME*) AS ingestion_time, a, f])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
> ingestion_time])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29781) ChangelogNormalize uses wrong keys after transformation by WatermarkAssignerChangelogNormalizeTransposeRule

2022-11-14 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-29781:
---
Fix Version/s: 1.17.0

> ChangelogNormalize uses wrong keys after transformation by 
> WatermarkAssignerChangelogNormalizeTransposeRule 
> 
>
> Key: FLINK-29781
> URL: https://issues.apache.org/jira/browse/FLINK-29781
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> currently WatermarkAssignerChangelogNormalizeTransposeRule didn't remap the 
> uniquekey indexes for its new input after plan rewrite, this may produce 
> wrong result.
> A simple case:
> {code}
>   @Test
>   def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
> util.addTable("""
> |CREATE TABLE t1 (
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  b VARCHAR NOT NULL,
> |  WATERMARK FOR ingestion_time AS ingestion_time
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)'
> |)
>   """.stripMargin)
> util.addTable("""
> |CREATE TABLE t2 (
> |  k VARBINARY,
> |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
> |  a VARCHAR NOT NULL,
> |  f BOOLEAN NOT NULL,
> |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,
> |  PRIMARY KEY (`a`) NOT ENFORCED
> |) WITH (
> | 'connector' = 'values',
> | 'readable-metadata' = 'ts:TIMESTAMP(3)',
> | 'changelog-mode' = 'I,UA,D'
> |)
>   """.stripMargin)
> val sql =
>   """
> |SELECT t1.a, t1.b, t2.f
> |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
> | ON t1.a = t2.a WHERE t2.f = true
> |""".stripMargin
> util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
>   }
> {code}
> the generated plan is incorrect for now:  
> {color:red}ChangelogNormalize(key=[ingestion_time]){color} uses wrong key 
> 'ingestion_time' (should be 'a')
> optimize result: 
> {code}
> Calc(select=[a, b, f])
> +- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
> __TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, 
> __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), 
> __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, 
> ingestion_time0, a0, f])
>:- Exchange(distribution=[hash[a]])
>:  +- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>: +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
> ingestion_time, a, b])
>:+- TableSourceScan(table=[[default_catalog, default_database, 
> t1]], fields=[a, b, ingestion_time])
>+- Exchange(distribution=[hash[a]])
>   +- Calc(select=[ingestion_time, a, f], where=[f])
>  +- ChangelogNormalize(key=[ingestion_time])
> +- Exchange(distribution=[hash[a]])
>+- WatermarkAssigner(rowtime=[ingestion_time], 
> watermark=[ingestion_time])
>   +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) 
> *ROWTIME*) AS ingestion_time, a, f])
>  +- TableSourceScan(table=[[default_catalog, 
> default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
> ingestion_time])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29981) Improve WatermarkAssignerChangelogNormalizeTransposeRule

2022-11-10 Thread godfrey he (Jira)
godfrey he created FLINK-29981:
--

 Summary: Improve WatermarkAssignerChangelogNormalizeTransposeRule
 Key: FLINK-29981
 URL: https://issues.apache.org/jira/browse/FLINK-29981
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he


 

WatermarkAssignerChangelogNormalizeTransposeRule is too complex to maintain. 
It's better we can do some improvement, such as splitting 
WatermarkAssignerChangelogNormalizeTransposeRule into two rules



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27998) Upgrade Calcite version to 1.30

2022-10-17 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17619204#comment-17619204
 ] 

godfrey he commented on FLINK-27998:


Can this ticket be closed since "Upgrade Calcite version to 1.32" has been 
created (FLINK-29319) ?



> Upgrade Calcite version to 1.30
> ---
>
> Key: FLINK-27998
> URL: https://issues.apache.org/jira/browse/FLINK-27998
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Martijn Visser
>Priority: Major
>
> The latest available version of Calcite is currently 1.30. We already need to 
> execute the rework that was planned when upgrading to Calcite 1.27 
> FLINK-20873 and when upgrading to Calcite 1.28 FLINK-21239
> When doing this upgrade, we should do this to the last available version. 
> This is needed to resolve multiple bugs. 
> Additional note: Flink currently uses Calcite 1.26.0 which has this note on 
> the release page:
> {{Warning: Calcite 1.26.0 has severe issues with RexNode simplification 
> caused by SEARCH operator ( wrong data from query optimization like in 
> CALCITE-4325, CALCITE-4352, NullPointerException), so use 1.26.0 for 
> development only, and beware that Calcite 1.26.0 might corrupt your data.}}
> https://calcite.apache.org/news/2020/10/06/release-1.26.0/
> The following files should be removed from the Flink code base when upgrading 
> calcite to 1.30.0
> in `org.apahce.calcite.rel.core`:
>  * Correlate
>  * Filter
>  * Intersect
>  * Minus
>  * SetOp
>  * Sort
>  * Union
>  * Values
>  * Window
> in `org.apahce.calcite.rel.hint`:
>  * HintPredicates
>  * NodeTypeHintPredicate
> in `org.apahce.calcite.rel.logical`:
>  * LogicalCorrelate
>  * LogicalFilter
>  * LogicalIntersect
>  * LogicalMinus
>  * LogicalSort
>  * LogicalUnion
>  * LogicalValues
>  * LogicalWindow



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-21239) Upgrade Calcite version to 1.28

2022-10-17 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-21239.
--
Resolution: Won't Fix

This ticket can be closed since "Upgrade Calcite version to 1.32" has been 
created (FLINK-29319)

> Upgrade Calcite version to 1.28
> ---
>
> Key: FLINK-21239
> URL: https://issues.apache.org/jira/browse/FLINK-21239
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> The following files should be removed from the Flink code base during an 
> upgrade:
> - org.apache.calcite.rex.RexLiteral



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-20873) Upgrade Calcite version to 1.27

2022-10-17 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-20873.
--
Resolution: Won't Fix

> Upgrade Calcite version to 1.27
> ---
>
> Key: FLINK-20873
> URL: https://issues.apache.org/jira/browse/FLINK-20873
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>
> The following files should be removed from the Flink code base during an 
> upgrade:
>  - org.apache.calcite.rex.RexSimplify
>  - org.apache.calcite.sql.SqlMatchRecognize
>  - org.apache.calcite.sql.SqlTableRef
>  - org.apache.calcite.sql2rel.RelDecorrelator
>  - org.apache.flink.table.planner.functions.sql.SqlJsonObjectFunction (added 
> in FLINK-16203)
>  - Adopt calcite's behaviour and add SQL tests once 
> [https://github.com/apache/calcite/pull/2555] is merged, (check FLINK-24576 )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20873) Upgrade Calcite version to 1.27

2022-10-17 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17619201#comment-17619201
 ] 

godfrey he commented on FLINK-20873:


This ticket can be closed since  "Upgrade Calcite version to 1.32" has been 
created (FLINK-29319)

> Upgrade Calcite version to 1.27
> ---
>
> Key: FLINK-20873
> URL: https://issues.apache.org/jira/browse/FLINK-20873
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>
> The following files should be removed from the Flink code base during an 
> upgrade:
>  - org.apache.calcite.rex.RexSimplify
>  - org.apache.calcite.sql.SqlMatchRecognize
>  - org.apache.calcite.sql.SqlTableRef
>  - org.apache.calcite.sql2rel.RelDecorrelator
>  - org.apache.flink.table.planner.functions.sql.SqlJsonObjectFunction (added 
> in FLINK-16203)
>  - Adopt calcite's behaviour and add SQL tests once 
> [https://github.com/apache/calcite/pull/2555] is merged, (check FLINK-24576 )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29625) Optimize changelog normalize for upsert source

2022-10-13 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617422#comment-17617422
 ] 

godfrey he commented on FLINK-29625:


+1 for the improvement

> Optimize changelog normalize for upsert source
> --
>
> Key: FLINK-29625
> URL: https://issues.apache.org/jira/browse/FLINK-29625
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Jiabao Sun
>Priority: Major
>
> Currently, Flink will add an expensive operator _changelog normalize_ to the 
> source of the upsert changelog mode to complete the _update_before_ value. 
> Even inserting directly from upsert-kafka source to upsert-kafka sink will 
> still add this operator, and there is an extra operator to clear 
> _upsert_before_ messages, which is obviously redundant.
> In CDC scenarios, some databases do not provide update before images, such as 
> Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres 
> ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process 
> these changelog will have a lot of state overhead.
> I don't know much about why this operator is needed, so I take the liberty to 
> ask if we can get rid of changelog normalize completely or optimistic about 
> it, adding it only if a normalized changelog is required by an after operator.
> If this optimization is worthwhile, I'm happy to help with it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29126) Fix spliting file optimization doesn't work for orc format

2022-10-11 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17615497#comment-17615497
 ] 

godfrey he commented on FLINK-29126:


Fixed in master: cf70844a56a0994dfcd7fb1859408683f2b621a3
Fixed in 1.16: cbc9d462b295243a61ebc544d9cf9ff6fa2a8aa6

> Fix spliting file optimization doesn't work for orc format
> --
>
> Key: FLINK-29126
> URL: https://issues.apache.org/jira/browse/FLINK-29126
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> FLINK-27338 try to improve file spliting for orc format. But it doesn't work 
> for a making  mistake in judge whether the table is stored as orc format or 
> not. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29126) Fix spliting file optimization doesn't work for orc format

2022-10-11 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-29126:
--

Assignee: luoyuxia

> Fix spliting file optimization doesn't work for orc format
> --
>
> Key: FLINK-29126
> URL: https://issues.apache.org/jira/browse/FLINK-29126
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> FLINK-27338 try to improve file spliting for orc format. But it doesn't work 
> for a making  mistake in judge whether the table is stored as orc format or 
> not. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29126) Fix spliting file optimization doesn't work for orc format

2022-10-11 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-29126:
---
Fix Version/s: 1.16.0

> Fix spliting file optimization doesn't work for orc format
> --
>
> Key: FLINK-29126
> URL: https://issues.apache.org/jira/browse/FLINK-29126
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> FLINK-27338 try to improve file spliting for orc format. But it doesn't work 
> for a making  mistake in judge whether the table is stored as orc format or 
> not. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27530) FLIP-227: Support overdraft buffer

2022-09-19 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606516#comment-17606516
 ] 

godfrey he commented on FLINK-27530:


[~pnowojski][~fanrui] could you update the release notes ?

> FLIP-227: Support overdraft buffer
> --
>
> Key: FLINK-27530
> URL: https://issues.apache.org/jira/browse/FLINK-27530
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network
>Reporter: fanrui
>Assignee: fanrui
>Priority: Major
> Fix For: 1.16.0
>
>
> This is the umbrella issue for the feature of unaligned checkpoints. Refer to 
> the 
> [FLIP-227|https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer]
>   for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29280) Join hint are not propagated in subquery

2022-09-18 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-29280:
---
Fix Version/s: 1.16.0
   1.17.0

> Join hint are not propagated in subquery
> 
>
> Key: FLINK-29280
> URL: https://issues.apache.org/jira/browse/FLINK-29280
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> Add the following code in JoinHintTestBase to re-produce this bug.
> {code:java}
> @Test
> public void testJoinHintWithJoinHintInSubQuery() {
> String sql =
> "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 from T2 
> join T3 on T2.a2 = T3.a3)";
> verifyRelPlanByCustom(String.format(sql, 
> buildCaseSensitiveStr(getTestSingleJoinHint(;
> } {code}
> This is because that calcite will not propagate the hint in subquery and 
> flink also doesn't resolve it in FlinkSubQueryRemoveRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29280) Join hint are not propagated in subquery

2022-09-18 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-29280.
--
Resolution: Fixed

Fixed in master: 22cb554008320e6684280b5205f93d7a6f685c6c
in 1.16.0: b37a8153f22b62982ca144604a34056246f6f36c

> Join hint are not propagated in subquery
> 
>
> Key: FLINK-29280
> URL: https://issues.apache.org/jira/browse/FLINK-29280
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> Add the following code in JoinHintTestBase to re-produce this bug.
> {code:java}
> @Test
> public void testJoinHintWithJoinHintInSubQuery() {
> String sql =
> "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 from T2 
> join T3 on T2.a2 = T3.a3)";
> verifyRelPlanByCustom(String.format(sql, 
> buildCaseSensitiveStr(getTestSingleJoinHint(;
> } {code}
> This is because that calcite will not propagate the hint in subquery and 
> flink also doesn't resolve it in FlinkSubQueryRemoveRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29280) Join hint are not propagated in subquery

2022-09-18 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-29280:
--

Assignee: xuyang

> Join hint are not propagated in subquery
> 
>
> Key: FLINK-29280
> URL: https://issues.apache.org/jira/browse/FLINK-29280
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> Add the following code in JoinHintTestBase to re-produce this bug.
> {code:java}
> @Test
> public void testJoinHintWithJoinHintInSubQuery() {
> String sql =
> "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 from T2 
> join T3 on T2.a2 = T3.a3)";
> verifyRelPlanByCustom(String.format(sql, 
> buildCaseSensitiveStr(getTestSingleJoinHint(;
> } {code}
> This is because that calcite will not propagate the hint in subquery and 
> flink also doesn't resolve it in FlinkSubQueryRemoveRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28738) [doc] Add a user doc about the correctness for non-deterministic updates

2022-09-18 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-28738.
--
Resolution: Fixed

Fixed in master: a02b2c232ea2fb1b22bd0e5c290e4c6f0217549b
in 1.16.0: be73c9695d01e3d3164b6c89342a1e41fa4ea450

> [doc] Add a user doc about the correctness for non-deterministic updates
> 
>
> Key: FLINK-28738
> URL: https://issues.apache.org/jira/browse/FLINK-28738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table SQL / Planner
>Reporter: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28569) SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty otherwise wrong result maybe produced

2022-09-15 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he closed FLINK-28569.
--
Resolution: Fixed

Fixed in master: dbcd2d7b86fcb7fa7a26e181f1719ea4c6dad828
in 1.16.0: 390612320fa9be297d9eed1f4b75f8ba2ec83c40

> SinkUpsertMaterializer should be aware of the input upsertKey if it is not 
> empty otherwise wrong result maybe produced
> --
>
> Key: FLINK-28569
> URL: https://issues.apache.org/jira/browse/FLINK-28569
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.5, 1.15.2
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently SinkUpsertMaterializer only update row by comparing the complete 
> row in anycase, but this may cause wrong result if input has upsertKey and 
> also non-deterministic column values, see such a case:
> {code:java}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from 
> users");
> }
> {code}
> for original cdc source records:
> {code}
> +I[user1, Tom, t...@gmail.com, 10.02],
> -D[user1, Tom, t...@gmail.com, 10.02],
> {code}
> the above query cannot correctly delete the former insertion row because of 
> the non-deterministic column value 'ndFunc(user_name)'
> this canbe solved by letting the SinkUpsertMaterializer be aware of input 
> upsertKey and update by it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >