Re: [PR] [FLINK-33236][config] Remove the unused high-availability.zookeeper.pth.running-registry option [flink]
flinkbot commented on PR #23506: URL: https://github.com/apache/flink/pull/23506#issuecomment-1756844304 ## CI report: * 72b6b3a52f9efd6980941259a12281bb40a78976 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option
[ https://issues.apache.org/jira/browse/FLINK-33236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773900#comment-17773900 ] Zhanghao Chen commented on FLINK-33236: --- Hi [~mapohl], I've prepared a quick cleanup PR for it. Could you help take a review when you are free? > Remove the unused high-availability.zookeeper.path.running-registry option > -- > > Key: FLINK-33236 > URL: https://issues.apache.org/jira/browse/FLINK-33236 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Configuration >Affects Versions: 1.18.0 >Reporter: Zhanghao Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > The running registry subcomponent of Flink HA has been removed in > [FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the > "high-availability.zookeeper.path.running-registry" option is of no use after > that. We should remove the option and regenerate the config doc to remove the > relevant descriptions to avoid user's confusion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option
[ https://issues.apache.org/jira/browse/FLINK-33236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33236: --- Labels: pull-request-available (was: ) > Remove the unused high-availability.zookeeper.path.running-registry option > -- > > Key: FLINK-33236 > URL: https://issues.apache.org/jira/browse/FLINK-33236 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Configuration >Affects Versions: 1.18.0 >Reporter: Zhanghao Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > The running registry subcomponent of Flink HA has been removed in > [FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the > "high-availability.zookeeper.path.running-registry" option is of no use after > that. We should remove the option and regenerate the config doc to remove the > relevant descriptions to avoid user's confusion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33236][config] Remove the unused high-availability.zookeeper.pth.running-registry option [flink]
X-czh opened a new pull request, #23506: URL: https://github.com/apache/flink/pull/23506 ## What is the purpose of the change The running registry subcomponent of Flink HA has been removed in [FLINK-25430](https://issues.apache.org/jira/browse/FLINK-25430) and the "high-availability.zookeeper.path.running-registry" option is of no use after that. We should remove the option and regenerate the config doc to remove the relevant descriptions to avoid user's confusion. ## Brief change log Remove the unused high-availability.zookeeper.pth.running-registry option and the relevant doc. ## Verifying this change This change is a trivial code cleanup. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option
Zhanghao Chen created FLINK-33236: - Summary: Remove the unused high-availability.zookeeper.path.running-registry option Key: FLINK-33236 URL: https://issues.apache.org/jira/browse/FLINK-33236 Project: Flink Issue Type: Technical Debt Components: Runtime / Configuration Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 The running registry subcomponent of Flink HA has been removed in [FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the "high-availability.zookeeper.path.running-registry" option is of no use after that. We should remove the option and regenerate the config doc to remove the relevant descriptions to avoid user's confusion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33209) Translate Flink OLAP quick start guide to Chinese
[ https://issues.apache.org/jira/browse/FLINK-33209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo reassigned FLINK-33209: -- Assignee: xiangyu feng > Translate Flink OLAP quick start guide to Chinese > - > > Key: FLINK-33209 > URL: https://issues.apache.org/jira/browse/FLINK-33209 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > > Translate Flink OLAP quick start guide to Chinese -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33235) Quickstart guide for Flink OLAP should support building from master branch
[ https://issues.apache.org/jira/browse/FLINK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo reassigned FLINK-33235: -- Assignee: xiangyu feng > Quickstart guide for Flink OLAP should support building from master branch > -- > > Key: FLINK-33235 > URL: https://issues.apache.org/jira/browse/FLINK-33235 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > > Many features required by OLAP session cluster are still in master branch or > in-progress and not released yet, for example: FLIP-295, FLIP-362, FLIP-374. > We need to address this in the document and show users how to quickly build > OLAP Session Cluster from master branch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33235) Quickstart guide for Flink OLAP should support building from master branch
xiangyu feng created FLINK-33235: Summary: Quickstart guide for Flink OLAP should support building from master branch Key: FLINK-33235 URL: https://issues.apache.org/jira/browse/FLINK-33235 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: xiangyu feng Many features required by OLAP session cluster are still in master branch or in-progress and not released yet, for example: FLIP-295, FLIP-362, FLIP-374. We need to address this in the document and show users how to quickly build OLAP Session Cluster from master branch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands
[ https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773896#comment-17773896 ] luoyuxia commented on FLINK-33168: -- I believe the doc has been updated in FLINK-32982. And the release note will also include it. > An error occurred when executing sql, java.lang.NoSuchFieldError: operands > -- > > Key: FLINK-33168 > URL: https://issues.apache.org/jira/browse/FLINK-33168 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.0 >Reporter: macdoor615 >Assignee: Zheng yunhong >Priority: Major > > Environment: > > {code:java} > Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 > 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux > openjdk version "1.8.0_382" > OpenJDK Runtime Environment (build 1.8.0_382-b05) > OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode) > flink-1.18.0-RC1 , > https://github.com/apache/flink/releases/tag/release-1.18.0-rc1 > {code} > > I execute the following sql in sql-client.sh. > > {code:java} > insert into svc1_paimon_prod.cq.b_customer_ecus > select > rcus.id id, > if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id, > if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) > newCus_id, > companyID, > customerProvinceNumber, > mobilePhone, > oprCode, > customerNum, > staffName, > location, > staffNumber, > extendInfo, > customerName, > case when companyID='000' then '名称1' > when companyID='002' then '名称2' > else '新名称' > end prov, > row ( > accessToken, > busType, > cutOffDay, > domain, > envFlag, > routeType, > routeValue, > sessionID, > sign, > signMethod, > org_timeStamp, > transIDO, > userPartyID, > version > ) raw_message, > named_struct( > 'id', cus.id, > 'name', cus.name, > 'code', cus.code, > 'customerlevel', cus.customerlevel, > 'prov', cus.prov, > 'container', cus.container, > 'crtime', cus.crtime, > 'updtime', cus.updtime > ) existing_cus, > cus_rownum, > to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp, > raw_rowtime, > localtimestamp as raw_rowtime1, > dt > from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq > /*+ OPTIONS('consumer-id' = '创建新客户id') */ > rcus > left join svc1_mysql_test.gem_svc1_vpn.bv_customer > FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code > {code} > There are the following jar files in the flink/lib directory. > {code:java} > commons-cli-1.5.0.jar > flink-cep-1.18.0.jar > flink-connector-files-1.18.0.jar > flink-connector-jdbc-3.1.1-1.17.jar > flink-csv-1.18.0.jar > flink-dist-1.18.0.jar > flink-json-1.18.0.jar > flink-orc-1.18.0.jar > flink-parquet-1.18.0.jar > flink-scala_2.12-1.18.0.jar > flink-sql-avro-1.18.0.jar > flink-sql-avro-confluent-registry-1.18.0.jar > flink-sql-connector-elasticsearch7-3.0.0-1.16.jar > flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar > flink-sql-connector-kafka-3.0.0-1.17.jar > flink-sql-orc-1.18.0.jar > flink-sql-parquet-1.18.0.jar > flink-table-api-java-uber-1.18.0.jar > flink-table-api-scala_2.12-1.18.0.jar > flink-table-api-scala-bridge_2.12-1.18.0.jar > flink-table-planner_2.12-1.18.0.jar > flink-table-runtime-1.18.0.jar > jline-reader-3.23.0.jar > jline-terminal-3.23.0.jar > kafka-clients-3.5.1.jar > log4j-1.2-api-2.17.1.jar > log4j-api-2.17.1.jar > log4j-core-2.17.1.jar > log4j-slf4j-impl-2.17.1.jar > mysql-connector-j-8.1.0.jar > paimon-flink-1.18-0.6-20230929.002044-11.jar{code} > Works correctly in version 1.17.1, but produces the following error in > 1.18.0-RC1 > > {code:java} > 2023-09-29 14:04:11,438 ERROR > org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed > to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da. > java.lang.NoSuchFieldError: operands > at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) > ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at >
Re: [PR] [FLINK-33171][table planner] Table SQL support Not Equal for TimePoint type and TimeString [flink]
lincoln-lil commented on code in PR #23478: URL: https://github.com/apache/flink/pull/23478#discussion_r1353989534 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala: ## @@ -488,6 +488,18 @@ object ScalarOperatorGens { else if (isNumeric(left.resultType) && isNumeric(right.resultType)) { generateComparison(ctx, "!=", left, right, resultType) } +// support date/time/timestamp not equalTo string. +else if ( + (isTimePoint(left.resultType) && isCharacterString(right.resultType)) || Review Comment: Can we add optimizations for string literals similar to the handling of `generateEquals`? ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala: ## @@ -228,6 +228,15 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { testSqlApi("uuid() = cast(f22 as timestamp_ltz)", "NULL") } + @Test + def testTimePointTypeNotEqualsStringType(): Unit = { +testSqlApi("f22 = '1996-11-10 12:34:56'", "TRUE") Review Comment: I see that the datetime family contains 5 datatypes(see `org.apache.flink.table.types.logical.LogicalTypeRoot`), so it is recommended to add more tests for complete coverage of the datatypes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands
[ https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] macdoor615 closed FLINK-33168. -- > An error occurred when executing sql, java.lang.NoSuchFieldError: operands > -- > > Key: FLINK-33168 > URL: https://issues.apache.org/jira/browse/FLINK-33168 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.0 >Reporter: macdoor615 >Assignee: Zheng yunhong >Priority: Major > > Environment: > > {code:java} > Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 > 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux > openjdk version "1.8.0_382" > OpenJDK Runtime Environment (build 1.8.0_382-b05) > OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode) > flink-1.18.0-RC1 , > https://github.com/apache/flink/releases/tag/release-1.18.0-rc1 > {code} > > I execute the following sql in sql-client.sh. > > {code:java} > insert into svc1_paimon_prod.cq.b_customer_ecus > select > rcus.id id, > if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id, > if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) > newCus_id, > companyID, > customerProvinceNumber, > mobilePhone, > oprCode, > customerNum, > staffName, > location, > staffNumber, > extendInfo, > customerName, > case when companyID='000' then '名称1' > when companyID='002' then '名称2' > else '新名称' > end prov, > row ( > accessToken, > busType, > cutOffDay, > domain, > envFlag, > routeType, > routeValue, > sessionID, > sign, > signMethod, > org_timeStamp, > transIDO, > userPartyID, > version > ) raw_message, > named_struct( > 'id', cus.id, > 'name', cus.name, > 'code', cus.code, > 'customerlevel', cus.customerlevel, > 'prov', cus.prov, > 'container', cus.container, > 'crtime', cus.crtime, > 'updtime', cus.updtime > ) existing_cus, > cus_rownum, > to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp, > raw_rowtime, > localtimestamp as raw_rowtime1, > dt > from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq > /*+ OPTIONS('consumer-id' = '创建新客户id') */ > rcus > left join svc1_mysql_test.gem_svc1_vpn.bv_customer > FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code > {code} > There are the following jar files in the flink/lib directory. > {code:java} > commons-cli-1.5.0.jar > flink-cep-1.18.0.jar > flink-connector-files-1.18.0.jar > flink-connector-jdbc-3.1.1-1.17.jar > flink-csv-1.18.0.jar > flink-dist-1.18.0.jar > flink-json-1.18.0.jar > flink-orc-1.18.0.jar > flink-parquet-1.18.0.jar > flink-scala_2.12-1.18.0.jar > flink-sql-avro-1.18.0.jar > flink-sql-avro-confluent-registry-1.18.0.jar > flink-sql-connector-elasticsearch7-3.0.0-1.16.jar > flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar > flink-sql-connector-kafka-3.0.0-1.17.jar > flink-sql-orc-1.18.0.jar > flink-sql-parquet-1.18.0.jar > flink-table-api-java-uber-1.18.0.jar > flink-table-api-scala_2.12-1.18.0.jar > flink-table-api-scala-bridge_2.12-1.18.0.jar > flink-table-planner_2.12-1.18.0.jar > flink-table-runtime-1.18.0.jar > jline-reader-3.23.0.jar > jline-terminal-3.23.0.jar > kafka-clients-3.5.1.jar > log4j-1.2-api-2.17.1.jar > log4j-api-2.17.1.jar > log4j-core-2.17.1.jar > log4j-slf4j-impl-2.17.1.jar > mysql-connector-j-8.1.0.jar > paimon-flink-1.18-0.6-20230929.002044-11.jar{code} > Works correctly in version 1.17.1, but produces the following error in > 1.18.0-RC1 > > {code:java} > 2023-09-29 14:04:11,438 ERROR > org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed > to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da. > java.lang.NoSuchFieldError: operands > at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) > ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at >
[jira] [Commented] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands
[ https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773893#comment-17773893 ] macdoor615 commented on FLINK-33168: [~luoyuxia] table-planner-loader.jar works. Docs should be updated. > An error occurred when executing sql, java.lang.NoSuchFieldError: operands > -- > > Key: FLINK-33168 > URL: https://issues.apache.org/jira/browse/FLINK-33168 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.0 >Reporter: macdoor615 >Assignee: Zheng yunhong >Priority: Major > > Environment: > > {code:java} > Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 > 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux > openjdk version "1.8.0_382" > OpenJDK Runtime Environment (build 1.8.0_382-b05) > OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode) > flink-1.18.0-RC1 , > https://github.com/apache/flink/releases/tag/release-1.18.0-rc1 > {code} > > I execute the following sql in sql-client.sh. > > {code:java} > insert into svc1_paimon_prod.cq.b_customer_ecus > select > rcus.id id, > if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id, > if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) > newCus_id, > companyID, > customerProvinceNumber, > mobilePhone, > oprCode, > customerNum, > staffName, > location, > staffNumber, > extendInfo, > customerName, > case when companyID='000' then '名称1' > when companyID='002' then '名称2' > else '新名称' > end prov, > row ( > accessToken, > busType, > cutOffDay, > domain, > envFlag, > routeType, > routeValue, > sessionID, > sign, > signMethod, > org_timeStamp, > transIDO, > userPartyID, > version > ) raw_message, > named_struct( > 'id', cus.id, > 'name', cus.name, > 'code', cus.code, > 'customerlevel', cus.customerlevel, > 'prov', cus.prov, > 'container', cus.container, > 'crtime', cus.crtime, > 'updtime', cus.updtime > ) existing_cus, > cus_rownum, > to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp, > raw_rowtime, > localtimestamp as raw_rowtime1, > dt > from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq > /*+ OPTIONS('consumer-id' = '创建新客户id') */ > rcus > left join svc1_mysql_test.gem_svc1_vpn.bv_customer > FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code > {code} > There are the following jar files in the flink/lib directory. > {code:java} > commons-cli-1.5.0.jar > flink-cep-1.18.0.jar > flink-connector-files-1.18.0.jar > flink-connector-jdbc-3.1.1-1.17.jar > flink-csv-1.18.0.jar > flink-dist-1.18.0.jar > flink-json-1.18.0.jar > flink-orc-1.18.0.jar > flink-parquet-1.18.0.jar > flink-scala_2.12-1.18.0.jar > flink-sql-avro-1.18.0.jar > flink-sql-avro-confluent-registry-1.18.0.jar > flink-sql-connector-elasticsearch7-3.0.0-1.16.jar > flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar > flink-sql-connector-kafka-3.0.0-1.17.jar > flink-sql-orc-1.18.0.jar > flink-sql-parquet-1.18.0.jar > flink-table-api-java-uber-1.18.0.jar > flink-table-api-scala_2.12-1.18.0.jar > flink-table-api-scala-bridge_2.12-1.18.0.jar > flink-table-planner_2.12-1.18.0.jar > flink-table-runtime-1.18.0.jar > jline-reader-3.23.0.jar > jline-terminal-3.23.0.jar > kafka-clients-3.5.1.jar > log4j-1.2-api-2.17.1.jar > log4j-api-2.17.1.jar > log4j-core-2.17.1.jar > log4j-slf4j-impl-2.17.1.jar > mysql-connector-j-8.1.0.jar > paimon-flink-1.18-0.6-20230929.002044-11.jar{code} > Works correctly in version 1.17.1, but produces the following error in > 1.18.0-RC1 > > {code:java} > 2023-09-29 14:04:11,438 ERROR > org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed > to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da. > java.lang.NoSuchFieldError: operands > at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) > ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at >
[jira] [Comment Edited] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands
[ https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773891#comment-17773891 ] luoyuxia edited comment on FLINK-33168 at 10/11/23 4:16 AM: I tried with puting table-planner-loader.jar in /lib and it works. Since it's expected in Flink 1.18 as you can see in FLINK-31575, we recommend to not to swap table-planner-loader and table-planner, I would like to close it. Feel free to open it again when you still meet problems. The reason may be a a little of complex, we include {{org/apache/calcite/plan/RelOptRule.class}} in flink-sql-connector-hive 3.1.3, the reason can be seen in [here|https://github.com/apache/flink/blob/5269631af525a01d944cfa9994a116fb27b80b1b/flink-connectors/flink-sql-connector-hive-3.1.3/pom.xml#L198]. Then, the planner will load the class {{RelOptRule}} in flink-sql-connector-hive 3.1.3, but the class RelOptRule in complied in flink-sql-connector-hive which shade {{com.google}} to {{{}org.apache.flink.hive.shaded.com.google{}}}. But {{RelOptRule}} will refer to {{{}com.google.common.collect.ImmutableList{}}}, afte complie, it will then become to refer to Field {{{}org/apache/calcite/plan/RelOptRuleOperandChildren.operands:Lorg/apache/flink/hive/shaded/com/google/common/collect/ImmutableList{}}}. But RelOptRuleOperandChildren is compiled in flink-table-planner which shade {{com.google}} to {{{}org.apache.flink.calcite.shaded.com.google{}}}, so it will only contain a field {{{}org/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList{}}}. Then it'll casue {{{}java.lang.NoSuchFieldError: operands{}}}. It's simliar to the issue FLINK-32286. was (Author: luoyuxia): I tried with puting table-planner-loader.jar in /lib and it works. Since it's expected in Flink 1.18 as you can see in FLINK-31575, we recommend to not to swap table-planner-loader and table-planner, I would like to close it. Feel free to open it again when you still meet problems. The reason may be a a little of complex, we include \{{org/apache/calcite/plan/RelOptRule.class}} in flink-sql-connector-hive 3.1.3, the reason can be seen in here. Then, the planner will load the class \{{RelOptRule}} in flink-sql-connector-hive 3.1.3, but the class RelOptRule in complied in flink-sql-connector-hive which shade \{{com.google}} to \{{org.apache.flink.hive.shaded.com.google}}. But \{{RelOptRule}} will refer to \{{com.google.common.collect.ImmutableList}}, afte complie, it will then become to refer to Field \{{org/apache/calcite/plan/RelOptRuleOperandChildren.operands:Lorg/apache/flink/hive/shaded/com/google/common/collect/ImmutableList}}. But RelOptRuleOperandChildren is compiled in flink-table-planner which shade \{{com.google}} to \{{org.apache.flink.calcite.shaded.com.google}}, so it will only contain a field \{{org/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList}}. Then it'll casue \{{java.lang.NoSuchFieldError: operands}}. It's simliar to the issue FLINK-32286. > An error occurred when executing sql, java.lang.NoSuchFieldError: operands > -- > > Key: FLINK-33168 > URL: https://issues.apache.org/jira/browse/FLINK-33168 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.0 >Reporter: macdoor615 >Assignee: Zheng yunhong >Priority: Major > > Environment: > > {code:java} > Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 > 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux > openjdk version "1.8.0_382" > OpenJDK Runtime Environment (build 1.8.0_382-b05) > OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode) > flink-1.18.0-RC1 , > https://github.com/apache/flink/releases/tag/release-1.18.0-rc1 > {code} > > I execute the following sql in sql-client.sh. > > {code:java} > insert into svc1_paimon_prod.cq.b_customer_ecus > select > rcus.id id, > if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id, > if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) > newCus_id, > companyID, > customerProvinceNumber, > mobilePhone, > oprCode, > customerNum, > staffName, > location, > staffNumber, > extendInfo, > customerName, > case when companyID='000' then '名称1' > when companyID='002' then '名称2' > else '新名称' > end prov, > row ( > accessToken, > busType, > cutOffDay, > domain, > envFlag, > routeType, > routeValue, > sessionID, > sign, > signMethod, > org_timeStamp, > transIDO, > userPartyID, > version > ) raw_message, > named_struct( > 'id', cus.id, > 'name', cus.name, > 'code', cus.code, > 'customerlevel', cus.customerlevel, > 'prov',
[jira] [Resolved] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands
[ https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-33168. -- Resolution: Invalid I tried with puting table-planner-loader.jar in /lib and it works. Since it's expected in Flink 1.18 as you can see in FLINK-31575, we recommend to not to swap table-planner-loader and table-planner, I would like to close it. Feel free to open it again when you still meet problems. The reason may be a a little of complex, we include \{{org/apache/calcite/plan/RelOptRule.class}} in flink-sql-connector-hive 3.1.3, the reason can be seen in here. Then, the planner will load the class \{{RelOptRule}} in flink-sql-connector-hive 3.1.3, but the class RelOptRule in complied in flink-sql-connector-hive which shade \{{com.google}} to \{{org.apache.flink.hive.shaded.com.google}}. But \{{RelOptRule}} will refer to \{{com.google.common.collect.ImmutableList}}, afte complie, it will then become to refer to Field \{{org/apache/calcite/plan/RelOptRuleOperandChildren.operands:Lorg/apache/flink/hive/shaded/com/google/common/collect/ImmutableList}}. But RelOptRuleOperandChildren is compiled in flink-table-planner which shade \{{com.google}} to \{{org.apache.flink.calcite.shaded.com.google}}, so it will only contain a field \{{org/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList}}. Then it'll casue \{{java.lang.NoSuchFieldError: operands}}. It's simliar to the issue FLINK-32286. > An error occurred when executing sql, java.lang.NoSuchFieldError: operands > -- > > Key: FLINK-33168 > URL: https://issues.apache.org/jira/browse/FLINK-33168 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.0 >Reporter: macdoor615 >Assignee: Zheng yunhong >Priority: Major > > Environment: > > {code:java} > Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 > 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux > openjdk version "1.8.0_382" > OpenJDK Runtime Environment (build 1.8.0_382-b05) > OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode) > flink-1.18.0-RC1 , > https://github.com/apache/flink/releases/tag/release-1.18.0-rc1 > {code} > > I execute the following sql in sql-client.sh. > > {code:java} > insert into svc1_paimon_prod.cq.b_customer_ecus > select > rcus.id id, > if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id, > if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) > newCus_id, > companyID, > customerProvinceNumber, > mobilePhone, > oprCode, > customerNum, > staffName, > location, > staffNumber, > extendInfo, > customerName, > case when companyID='000' then '名称1' > when companyID='002' then '名称2' > else '新名称' > end prov, > row ( > accessToken, > busType, > cutOffDay, > domain, > envFlag, > routeType, > routeValue, > sessionID, > sign, > signMethod, > org_timeStamp, > transIDO, > userPartyID, > version > ) raw_message, > named_struct( > 'id', cus.id, > 'name', cus.name, > 'code', cus.code, > 'customerlevel', cus.customerlevel, > 'prov', cus.prov, > 'container', cus.container, > 'crtime', cus.crtime, > 'updtime', cus.updtime > ) existing_cus, > cus_rownum, > to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp, > raw_rowtime, > localtimestamp as raw_rowtime1, > dt > from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq > /*+ OPTIONS('consumer-id' = '创建新客户id') */ > rcus > left join svc1_mysql_test.gem_svc1_vpn.bv_customer > FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code > {code} > There are the following jar files in the flink/lib directory. > {code:java} > commons-cli-1.5.0.jar > flink-cep-1.18.0.jar > flink-connector-files-1.18.0.jar > flink-connector-jdbc-3.1.1-1.17.jar > flink-csv-1.18.0.jar > flink-dist-1.18.0.jar > flink-json-1.18.0.jar > flink-orc-1.18.0.jar > flink-parquet-1.18.0.jar > flink-scala_2.12-1.18.0.jar > flink-sql-avro-1.18.0.jar > flink-sql-avro-confluent-registry-1.18.0.jar > flink-sql-connector-elasticsearch7-3.0.0-1.16.jar > flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar > flink-sql-connector-kafka-3.0.0-1.17.jar > flink-sql-orc-1.18.0.jar > flink-sql-parquet-1.18.0.jar > flink-table-api-java-uber-1.18.0.jar > flink-table-api-scala_2.12-1.18.0.jar > flink-table-api-scala-bridge_2.12-1.18.0.jar > flink-table-planner_2.12-1.18.0.jar > flink-table-runtime-1.18.0.jar > jline-reader-3.23.0.jar > jline-terminal-3.23.0.jar > kafka-clients-3.5.1.jar > log4j-1.2-api-2.17.1.jar > log4j-api-2.17.1.jar > log4j-core-2.17.1.jar > log4j-slf4j-impl-2.17.1.jar > mysql-connector-j-8.1.0.jar >
Re: [PR] [FLINK-24024][table-planner] support session window tvf in plan [flink]
flinkbot commented on PR #23505: URL: https://github.com/apache/flink/pull/23505#issuecomment-1756729241 ## CI report: * 8302b0995123b2a536ae434f8e958dfa3d8c2302 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-24024][table-planner] support session window tvf in plan [flink]
xuyangzhong opened a new pull request, #23505: URL: https://github.com/apache/flink/pull/23505 ## What is the purpose of the change This pull request aims to introduce the support of the session window tvf in the plan. After this pr, users can use the syntax with session window tvf with SQL. Note: this pr is not related to table-runtime. The session window tvf only has the basic features like those described in https://issues.apache.org/jira/browse/FLINK-23544 . ## Brief change log - adapt the requirement about ptf in calcite: full the table characteristic of the session window tvf. - adapt some rules about introducing session window tvf. - add ut and it cases for it. ## Verifying this change Some ut and it cases are added to verify this pr. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices
[ https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei closed FLINK-32072. -- > Create and wire FileMergingSnapshotManager with TaskManagerServices > --- > > Key: FLINK-32072 > URL: https://issues.apache.org/jira/browse/FLINK-32072 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices
[ https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei resolved FLINK-32072. Fix Version/s: 1.19.0 (was: 1.18.0) Resolution: Fixed > Create and wire FileMergingSnapshotManager with TaskManagerServices > --- > > Key: FLINK-32072 > URL: https://issues.apache.org/jira/browse/FLINK-32072 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices
[ https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773884#comment-17773884 ] Yanfei Lei commented on FLINK-32072: Merged into master via 1112582fd136df47c6d356d6f6ad3946ad1e56d5 > Create and wire FileMergingSnapshotManager with TaskManagerServices > --- > > Key: FLINK-32072 > URL: https://issues.apache.org/jira/browse/FLINK-32072 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Zakelly Lan >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32443][docs-zh] Translate "State Processor API" page into Chinese [flink]
fredia commented on PR #23496: URL: https://github.com/apache/flink/pull/23496#issuecomment-1756665330 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
FangYongs commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1353844833 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -65,9 +69,14 @@ public class ResourceManager implements Closeable { private static final String FILE_SCHEME = "file"; private final Path localResourceDir; +/** Resource infos for functions. */ +private final Map functionResourceInfos; + protected final Map resourceInfos; protected final MutableURLClassLoader userClassLoader; +private final List createdClassLoaderList; Review Comment: DONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33228) Fix the total current resource calculation when fulfilling requirements
[ https://issues.apache.org/jira/browse/FLINK-33228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo closed FLINK-33228. -- Resolution: Fixed > Fix the total current resource calculation when fulfilling requirements > --- > > Key: FLINK-33228 > URL: https://issues.apache.org/jira/browse/FLINK-33228 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0 >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-10-10-16-09-23-635.png > > > Currently, the `totalCurrentResources` calculation in > `DefaultResourceAllocationStrategy#tryFulfillRequirements` is wrong. > `ResourceProfile.merge` will not change the original `ResourceProfile`. > !image-2023-10-10-16-09-23-635.png|width=564,height=286! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
FangYongs commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1353833344 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ## @@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri resourceUri) throws IOException { Collections.singletonList(resourceUri), ResourceType.FILE, false, -url -> {}); +url -> {}, +false); registerResources(stagingResources, false); return resourceInfos.get(new ArrayList<>(stagingResources.keySet()).get(0)).getPath(); } +/** + * Register a resource for function and add it to the function resource infos. If the file is + * remote, it will be copied to a local file. + * + * @param resourceUris the resource uri for function. + */ +public void registerFunctionResources(Set resourceUris) throws IOException { +prepareStagingResources( +resourceUris, +ResourceType.JAR, +true, +url -> { +try { +JarUtils.checkJarFile(url); +} catch (IOException e) { +throw new ValidationException( +String.format("Failed to register jar resource [%s]", url), e); +} +}, +true); +} + +/** + * Unregister the resource uri in function resources, when the reference count of the resource + * is 0, the resource will be removed from the function resources. + * + * @param resourceUris the uris to unregister in function resources. + */ +public void unregisterFunctionResources(List resourceUris) { +if (!resourceUris.isEmpty()) { +resourceUris.forEach( +uri -> { +ResourceCounter counter = functionResourceInfos.get(uri); +if (counter != null && counter.decreaseCounter()) { +functionResourceInfos.remove(uri); +} +}); +} +} + public URLClassLoader getUserClassLoader() { return userClassLoader; } +public URLClassLoader createUserClassLoader(List resourceUris) { +if (resourceUris.isEmpty()) { +return userClassLoader; +} +MutableURLClassLoader classLoader = userClassLoader.copy(); +for (ResourceUri resourceUri : new HashSet<>(resourceUris)) { + classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url); +} +createdClassLoaderList.add(classLoader); + +return classLoader; +} + +public void closeUserClassLoader(URLClassLoader classLoader) { +if (createdClassLoaderList.remove(classLoader)) { Review Comment: Remove created class loader list in resourcemanager -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33228) Fix the total current resource calculation when fulfilling requirements
[ https://issues.apache.org/jira/browse/FLINK-33228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773872#comment-17773872 ] Yangze Guo commented on FLINK-33228: master: 5269631af525a01d944cfa9994a116fb27b80b1b > Fix the total current resource calculation when fulfilling requirements > --- > > Key: FLINK-33228 > URL: https://issues.apache.org/jira/browse/FLINK-33228 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0 >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-10-10-16-09-23-635.png > > > Currently, the `totalCurrentResources` calculation in > `DefaultResourceAllocationStrategy#tryFulfillRequirements` is wrong. > `ResourceProfile.merge` will not change the original `ResourceProfile`. > !image-2023-10-10-16-09-23-635.png|width=564,height=286! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]
FangYongs commented on code in PR #22938: URL: https://github.com/apache/flink/pull/22938#discussion_r1353833112 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ## @@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, boolean ignoreIfNotExist "Could not drop temporary system function. A function named '%s' doesn't exist.", name)); } +unregisterFunctionJarResources(function); return function != null; } +private void unregisterFunctionJarResources(CatalogFunction function) { +if (function != null && function.getFunctionLanguage() == FunctionLanguage.JAVA) { Review Comment: Python udf is another story, we fix java udf first and consider python udf in another PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33228][flink-runtime] Fix the total current resource calculation when fulfilling requirement [flink]
KarmaGYZ merged PR #23502: URL: https://github.com/apache/flink/pull/23502 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands
[ https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773871#comment-17773871 ] luoyuxia commented on FLINK-33168: -- [~macdoor615] Hi, could you please try using flink-table-planer-loader.jar replace flink-table-planer.jar? > An error occurred when executing sql, java.lang.NoSuchFieldError: operands > -- > > Key: FLINK-33168 > URL: https://issues.apache.org/jira/browse/FLINK-33168 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.0 >Reporter: macdoor615 >Assignee: Zheng yunhong >Priority: Major > > Environment: > > {code:java} > Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 > 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux > openjdk version "1.8.0_382" > OpenJDK Runtime Environment (build 1.8.0_382-b05) > OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode) > flink-1.18.0-RC1 , > https://github.com/apache/flink/releases/tag/release-1.18.0-rc1 > {code} > > I execute the following sql in sql-client.sh. > > {code:java} > insert into svc1_paimon_prod.cq.b_customer_ecus > select > rcus.id id, > if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id, > if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) > newCus_id, > companyID, > customerProvinceNumber, > mobilePhone, > oprCode, > customerNum, > staffName, > location, > staffNumber, > extendInfo, > customerName, > case when companyID='000' then '名称1' > when companyID='002' then '名称2' > else '新名称' > end prov, > row ( > accessToken, > busType, > cutOffDay, > domain, > envFlag, > routeType, > routeValue, > sessionID, > sign, > signMethod, > org_timeStamp, > transIDO, > userPartyID, > version > ) raw_message, > named_struct( > 'id', cus.id, > 'name', cus.name, > 'code', cus.code, > 'customerlevel', cus.customerlevel, > 'prov', cus.prov, > 'container', cus.container, > 'crtime', cus.crtime, > 'updtime', cus.updtime > ) existing_cus, > cus_rownum, > to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp, > raw_rowtime, > localtimestamp as raw_rowtime1, > dt > from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq > /*+ OPTIONS('consumer-id' = '创建新客户id') */ > rcus > left join svc1_mysql_test.gem_svc1_vpn.bv_customer > FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code > {code} > There are the following jar files in the flink/lib directory. > {code:java} > commons-cli-1.5.0.jar > flink-cep-1.18.0.jar > flink-connector-files-1.18.0.jar > flink-connector-jdbc-3.1.1-1.17.jar > flink-csv-1.18.0.jar > flink-dist-1.18.0.jar > flink-json-1.18.0.jar > flink-orc-1.18.0.jar > flink-parquet-1.18.0.jar > flink-scala_2.12-1.18.0.jar > flink-sql-avro-1.18.0.jar > flink-sql-avro-confluent-registry-1.18.0.jar > flink-sql-connector-elasticsearch7-3.0.0-1.16.jar > flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar > flink-sql-connector-kafka-3.0.0-1.17.jar > flink-sql-orc-1.18.0.jar > flink-sql-parquet-1.18.0.jar > flink-table-api-java-uber-1.18.0.jar > flink-table-api-scala_2.12-1.18.0.jar > flink-table-api-scala-bridge_2.12-1.18.0.jar > flink-table-planner_2.12-1.18.0.jar > flink-table-runtime-1.18.0.jar > jline-reader-3.23.0.jar > jline-terminal-3.23.0.jar > kafka-clients-3.5.1.jar > log4j-1.2-api-2.17.1.jar > log4j-api-2.17.1.jar > log4j-core-2.17.1.jar > log4j-slf4j-impl-2.17.1.jar > mysql-connector-j-8.1.0.jar > paimon-flink-1.18-0.6-20230929.002044-11.jar{code} > Works correctly in version 1.17.1, but produces the following error in > 1.18.0-RC1 > > {code:java} > 2023-09-29 14:04:11,438 ERROR > org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed > to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da. > java.lang.NoSuchFieldError: operands > at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) > ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at >
[jira] [Assigned] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands
[ https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia reassigned FLINK-33168: Assignee: Zheng yunhong > An error occurred when executing sql, java.lang.NoSuchFieldError: operands > -- > > Key: FLINK-33168 > URL: https://issues.apache.org/jira/browse/FLINK-33168 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.0 >Reporter: macdoor615 >Assignee: Zheng yunhong >Priority: Major > > Environment: > > {code:java} > Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 > 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux > openjdk version "1.8.0_382" > OpenJDK Runtime Environment (build 1.8.0_382-b05) > OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode) > flink-1.18.0-RC1 , > https://github.com/apache/flink/releases/tag/release-1.18.0-rc1 > {code} > > I execute the following sql in sql-client.sh. > > {code:java} > insert into svc1_paimon_prod.cq.b_customer_ecus > select > rcus.id id, > if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id, > if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) > newCus_id, > companyID, > customerProvinceNumber, > mobilePhone, > oprCode, > customerNum, > staffName, > location, > staffNumber, > extendInfo, > customerName, > case when companyID='000' then '名称1' > when companyID='002' then '名称2' > else '新名称' > end prov, > row ( > accessToken, > busType, > cutOffDay, > domain, > envFlag, > routeType, > routeValue, > sessionID, > sign, > signMethod, > org_timeStamp, > transIDO, > userPartyID, > version > ) raw_message, > named_struct( > 'id', cus.id, > 'name', cus.name, > 'code', cus.code, > 'customerlevel', cus.customerlevel, > 'prov', cus.prov, > 'container', cus.container, > 'crtime', cus.crtime, > 'updtime', cus.updtime > ) existing_cus, > cus_rownum, > to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp, > raw_rowtime, > localtimestamp as raw_rowtime1, > dt > from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq > /*+ OPTIONS('consumer-id' = '创建新客户id') */ > rcus > left join svc1_mysql_test.gem_svc1_vpn.bv_customer > FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code > {code} > There are the following jar files in the flink/lib directory. > {code:java} > commons-cli-1.5.0.jar > flink-cep-1.18.0.jar > flink-connector-files-1.18.0.jar > flink-connector-jdbc-3.1.1-1.17.jar > flink-csv-1.18.0.jar > flink-dist-1.18.0.jar > flink-json-1.18.0.jar > flink-orc-1.18.0.jar > flink-parquet-1.18.0.jar > flink-scala_2.12-1.18.0.jar > flink-sql-avro-1.18.0.jar > flink-sql-avro-confluent-registry-1.18.0.jar > flink-sql-connector-elasticsearch7-3.0.0-1.16.jar > flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar > flink-sql-connector-kafka-3.0.0-1.17.jar > flink-sql-orc-1.18.0.jar > flink-sql-parquet-1.18.0.jar > flink-table-api-java-uber-1.18.0.jar > flink-table-api-scala_2.12-1.18.0.jar > flink-table-api-scala-bridge_2.12-1.18.0.jar > flink-table-planner_2.12-1.18.0.jar > flink-table-runtime-1.18.0.jar > jline-reader-3.23.0.jar > jline-terminal-3.23.0.jar > kafka-clients-3.5.1.jar > log4j-1.2-api-2.17.1.jar > log4j-api-2.17.1.jar > log4j-core-2.17.1.jar > log4j-slf4j-impl-2.17.1.jar > mysql-connector-j-8.1.0.jar > paimon-flink-1.18-0.6-20230929.002044-11.jar{code} > Works correctly in version 1.17.1, but produces the following error in > 1.18.0-RC1 > > {code:java} > 2023-09-29 14:04:11,438 ERROR > org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed > to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da. > java.lang.NoSuchFieldError: operands > at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) > ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35) > ~[flink-table-planner_2.12-1.18.0.jar:1.18.0] > at > org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala) >
[jira] [Resolved] (FLINK-25593) A redundant scan could be skipped if it is an input of join and the other input is empty after partition prune
[ https://issues.apache.org/jira/browse/FLINK-25593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-25593. -- Resolution: Fixed master: 1b70d25b81daa7bd4a23c048cb3a79bc43a21d5f > A redundant scan could be skipped if it is an input of join and the other > input is empty after partition prune > -- > > Key: FLINK-25593 > URL: https://issues.apache.org/jira/browse/FLINK-25593 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jing Zhang >Assignee: Yunhong Zheng >Priority: Major > Labels: pull-request-available > > A redundant scan could be skipped if it is an input of join and the other > input is empty after partition prune. > For example: > ltable has two partitions: pt=0 ad pt=1, rtable has one partition pt1=0. > The schema of ltable is (lkey string, value int). > The schema of rtable is (rkey string, value int). > {code:sql} > SELECT * FROM ltable, rtable WHERE pt=2 and pt1=0 and `lkey`=rkey > {code} > The plan is as following. > {code:java} > Calc(select=[lkey, value, CAST(2 AS INTEGER) AS pt, rkey, value1, CAST(0 AS > INTEGER) AS pt1]) > +- HashJoin(joinType=[InnerJoin], where=[=(lkey, rkey)], select=[lkey, value, > rkey, value1], build=[right]) >:- Exchange(distribution=[hash[lkey]]) >: +- TableSourceScan(table=[[hive, source_db, ltable, partitions=[], > project=[lkey, value]]], fields=[lkey, value]) >+- Exchange(distribution=[hash[rkey]]) > +- TableSourceScan(table=[[hive, source_db, rtable, > partitions=[{pt1=0}], project=[rkey, value1]]], fields=[rkey, value1]) > {code} > There is no need to scan right side because the left input of join has 0 > partitions after partition prune. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-25593][table-planner] Skip redundant scan while partition table push down contains none-existent partition [flink]
luoyuxia merged PR #23423: URL: https://github.com/apache/flink/pull/23423 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-33231: --- Assignee: Tzu-Li (Gordon) Tai > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Labels: pull-request-available > Fix For: kafka-3.0.1, kafka-3.1.0 > > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} > > > The offsetsToCommit map is cleaned from entries once they have been committed > to Kafka which happens during the callback function that is passed to the > KafkaSourceFetcherManager.commitOffsets method in > KafkaSourceReader.notifyCheckpointComplete method. > However if the committedPartitions is empty for the given checkpoint, then > the KafkaSourceFetcherManager.commitOffsets method returns. > [KafkaSourceFetcherManager line >
Re: [PR] [FLINK-33191][Connector/Kafka] Remove dependency on Flink Shaded [flink-connector-kafka]
tzulitai commented on code in PR #57: URL: https://github.com/apache/flink-connector-kafka/pull/57#discussion_r1353655402 ## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java: ## @@ -82,16 +81,24 @@ public void testKafkaDeserializationSchemaWrapper() throws Exception { @Test public void testKafkaValueDeserializationSchemaWrapper() throws Exception { final ConsumerRecord consumerRecord = getConsumerRecord(); -KafkaRecordDeserializationSchema schema = -KafkaRecordDeserializationSchema.valueOnly( -new JsonDeserializationSchema<>(ObjectNode.class)); +KafkaRecordDeserializationSchema< + org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node Review Comment: I'm a bit lost here. Why do these test code still need to use the shaded version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo
[ https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-33191: Affects Version/s: kafka-3.0.0 (was: 1.18.0) > Kafka Connector should directly depend on 3rd-party libs instead of > flink-shaded repo > - > > Key: FLINK-33191 > URL: https://issues.apache.org/jira/browse/FLINK-33191 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: kafka-3.0.0 >Reporter: Jing Ge >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo
[ https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-33191: Fix Version/s: kafka-3.0.1 kafka-3.1.0 > Kafka Connector should directly depend on 3rd-party libs instead of > flink-shaded repo > - > > Key: FLINK-33191 > URL: https://issues.apache.org/jira/browse/FLINK-33191 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: kafka-3.0.0 >Reporter: Jing Ge >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.0.1, kafka-3.1.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33020) OpensearchSinkTest.testAtLeastOnceSink timed out
[ https://issues.apache.org/jira/browse/FLINK-33020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-33020. - Fix Version/s: opensearch-1.1.0 Resolution: Fixed > OpensearchSinkTest.testAtLeastOnceSink timed out > > > Key: FLINK-33020 > URL: https://issues.apache.org/jira/browse/FLINK-33020 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.2 >Reporter: Martijn Visser >Assignee: Andriy Redko >Priority: Blocker > Labels: pull-request-available, stale-blocker > Fix For: opensearch-1.1.0 > > > https://github.com/apache/flink-connector-opensearch/actions/runs/6061205003/job/16446139552#step:13:1029 > {code:java} > Error: Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 9.837 > s <<< FAILURE! - in > org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest > Error: > org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.testAtLeastOnceSink > Time elapsed: 5.022 s <<< ERROR! > java.util.concurrent.TimeoutException: testAtLeastOnceSink() timed out after > 5 seconds > at > org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70) > at > org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at >
[jira] [Commented] (FLINK-33020) OpensearchSinkTest.testAtLeastOnceSink timed out
[ https://issues.apache.org/jira/browse/FLINK-33020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773823#comment-17773823 ] Sergey Nuyanzin commented on FLINK-33020: - Time out was increased at [87b23c46f6f3a26bdd645d623a34dee3d19dac9d|https://github.com/apache/flink-connector-opensearch/commit/87b23c46f6f3a26bdd645d623a34dee3d19dac9d] > OpensearchSinkTest.testAtLeastOnceSink timed out > > > Key: FLINK-33020 > URL: https://issues.apache.org/jira/browse/FLINK-33020 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.2 >Reporter: Martijn Visser >Assignee: Andriy Redko >Priority: Blocker > Labels: pull-request-available, stale-blocker > > https://github.com/apache/flink-connector-opensearch/actions/runs/6061205003/job/16446139552#step:13:1029 > {code:java} > Error: Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 9.837 > s <<< FAILURE! - in > org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest > Error: > org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.testAtLeastOnceSink > Time elapsed: 5.022 s <<< ERROR! > java.util.concurrent.TimeoutException: testAtLeastOnceSink() timed out after > 5 seconds > at > org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70) > at > org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at
[PR] [hotfix] Bump flink version to 1.17.1 [flink-connector-opensearch]
snuyanzin opened a new pull request, #33: URL: https://github.com/apache/flink-connector-opensearch/pull/33 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-33020) OpensearchSinkTest.testAtLeastOnceSink timed out
[ https://issues.apache.org/jira/browse/FLINK-33020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-33020: --- Assignee: Andriy Redko > OpensearchSinkTest.testAtLeastOnceSink timed out > > > Key: FLINK-33020 > URL: https://issues.apache.org/jira/browse/FLINK-33020 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.2 >Reporter: Martijn Visser >Assignee: Andriy Redko >Priority: Blocker > Labels: pull-request-available, stale-blocker > > https://github.com/apache/flink-connector-opensearch/actions/runs/6061205003/job/16446139552#step:13:1029 > {code:java} > Error: Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 9.837 > s <<< FAILURE! - in > org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest > Error: > org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.testAtLeastOnceSink > Time elapsed: 5.022 s <<< ERROR! > java.util.concurrent.TimeoutException: testAtLeastOnceSink() timed out after > 5 seconds > at > org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70) > at > org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at >
Re: [PR] [FLINK-33020] OpensearchSinkTest.testAtLeastOnceSink timed out [flink-connector-opensearch]
snuyanzin merged PR #32: URL: https://github.com/apache/flink-connector-opensearch/pull/32 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33200) ItemAt Expression validation fail in Table API due to type mismatch
[ https://issues.apache.org/jira/browse/FLINK-33200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773794#comment-17773794 ] Andriy Onyshchuk commented on FLINK-33200: -- Seems like I have caught several more issues related to `AT` expression and type resolution. # accessing array of primitives doesn't work if at data level arrays represented as ArrayData. What I got is: `Unsupported conversion from data type 'ARRAY' (conversion class: org.apache.flink.table.data.ArrayData) to type information. Only data types that originated from type information fully support a reverse conversion.` # accessing `Map` throws too. `map.at(0L)` gets failed with Incompatible types for sink column {{Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for '*anonymous_collect$3*' do not match.}} {{Cause: Incompatible types for sink column 'longData_at_0' at position 0.}} {{Query schema: [longData_at_0: STRING]}} {{Sink schema: [longData_at_0: RAW('org.apache.flink.table.data.StringData', ?)]}} All issues are reflected in `IssueDemo.java` (see attachments). > ItemAt Expression validation fail in Table API due to type mismatch > --- > > Key: FLINK-33200 > URL: https://issues.apache.org/jira/browse/FLINK-33200 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.0, 1.17.1 >Reporter: Zhenqiu Huang >Priority: Minor > Attachments: IssueDemo.java > > > The table schema is defined as below: > public static final DataType DATA_TYPE = DataTypes.ROW( > DataTypes.FIELD("id", DataTypes.STRING()), > DataTypes.FIELD("events", > DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING( > ); > public static final Schema SCHEMA = > Schema.newBuilder().fromRowDataType(DATA_TYPE).build(); > inTable.select(Expressions.$("events").at(1).at("eventType").as("firstEventType") > The validation fail as "eventType" is inferred as > BasicTypeInfo.STRING_TYPE_INFO, the table key internally is a > StringDataTypeInfo. The validation fail at > case mti: MapTypeInfo[_, _] => > if (key.resultType == mti.getKeyTypeInfo) { > ValidationSuccess > } else { > ValidationFailure( > s"Map entry access needs a valid key of type " + > s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.") > } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33200) ItemAt Expression validation fail in Table API due to type mismatch
[ https://issues.apache.org/jira/browse/FLINK-33200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andriy Onyshchuk updated FLINK-33200: - Attachment: IssueDemo.java > ItemAt Expression validation fail in Table API due to type mismatch > --- > > Key: FLINK-33200 > URL: https://issues.apache.org/jira/browse/FLINK-33200 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.0, 1.17.1 >Reporter: Zhenqiu Huang >Priority: Minor > Attachments: IssueDemo.java > > > The table schema is defined as below: > public static final DataType DATA_TYPE = DataTypes.ROW( > DataTypes.FIELD("id", DataTypes.STRING()), > DataTypes.FIELD("events", > DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING( > ); > public static final Schema SCHEMA = > Schema.newBuilder().fromRowDataType(DATA_TYPE).build(); > inTable.select(Expressions.$("events").at(1).at("eventType").as("firstEventType") > The validation fail as "eventType" is inferred as > BasicTypeInfo.STRING_TYPE_INFO, the table key internally is a > StringDataTypeInfo. The validation fail at > case mti: MapTypeInfo[_, _] => > if (key.resultType == mti.getKeyTypeInfo) { > ValidationSuccess > } else { > ValidationFailure( > s"Map entry access needs a valid key of type " + > s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.") > } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33231: --- Labels: pull-request-available (was: ) > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Priority: Blocker > Labels: pull-request-available > Fix For: kafka-3.0.1, kafka-3.1.0 > > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} > > > The offsetsToCommit map is cleaned from entries once they have been committed > to Kafka which happens during the callback function that is passed to the > KafkaSourceFetcherManager.commitOffsets method in > KafkaSourceReader.notifyCheckpointComplete method. > However if the committedPartitions is empty for the given checkpoint, then > the KafkaSourceFetcherManager.commitOffsets method returns. > [KafkaSourceFetcherManager line >
[PR] [FLINK-33231] [source] Properly evict offsetsToCommit cache on checkpoint complete if no offsets exist [flink-connector-kafka]
tzulitai opened a new pull request, #58: URL: https://github.com/apache/flink-connector-kafka/pull/58 Prior to this fix, if the offsets to commit for a given checkpoint is empty, which can be the case if no starting offsets were retrieved from Kafka yet, then on checkpoint completion the cache is not properly evicted up to the given checkpoint. This change fixes this such that in notifyOnCheckpointComplete, we shortcut the method execution to not need to try to commit the offsets since its empty anyways, and always remember to evict the cache up to the completed checkpoint. ## Testing I've modified the existing `KafkaSourceReaderTest#testCommitEmptyOffsets()` test to fail if the cache eviction fix was not applied. With this PR, that test now passes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773790#comment-17773790 ] Tzu-Li (Gordon) Tai commented on FLINK-33231: - [~lauri.suurvali] I think that would work, but the issue is that in the callback, on success we log that a commit was successful, and also source reader metrics is bumped. Which can be confusing if no offsets were actually committed. Moreoever, with that approach we would be relying on internal details of the Kafka client that is hard to cover with tests (i.e. things might silently change such that a remote request is issued even if provided offsets are empty, which is not ideal). So, I think we can be a bit cleaner by short-cutting the {{notifyCheckpointComplete}} method such that is the offsets for a checkpoint is empty, we don't even attempt to use the fetcher manager to try to commit offsets. > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Priority: Blocker > Fix For: kafka-3.0.1, kafka-3.1.0 > > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset:
[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773784#comment-17773784 ] Lauri Suurväli commented on FLINK-33231: [~tzulitai] thank you for the comment! Would removing the code that you linked, which returns in case of an empty offsetsToCommit, be an option to solve this issue? The remaining code would end up in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#sendOffsetCommitRequest which would return a successful response locally in case of an empty offsets map. [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java at master · a0x8o/kafka (github.com)|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1267-L1269] Since the request is handled locally then perhaps this is a good way to ensure that the callback function isn't discarded. Would this sort of an approach bring any additional overhead that we would like to avoid or perhaps I am missing something? > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Priority: Blocker > Fix For: kafka-3.0.1, kafka-3.1.0 > > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: >
Re: [PR] [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data [flink-connector-kafka]
tzulitai commented on PR #52: URL: https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1755995458 @Tan-JiaLiang ok - lets try to reach consensus here. It does seem like that my approach can lead to other problems, e.g. bombarding Kafka brokers from multiple end-offset lookups across multiple TMs. This might turn out worse than doing just one more extra query on the JM. And since JMs right already are assumed to have access to Kafka brokers already anyways, we should be breaking anything for users. So overall - lets proceed with this PR's approach. Sorry for the back and forth @Tan-JiaLiang, just wanted to make sure that we're fixing this properly. Regarding the restore issue you mentioned: yes, please address that, and then I think we can merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data [flink-connector-kafka]
tzulitai commented on PR #52: URL: https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1755980007 @Tan-JiaLiang you are right, that's why it's taking me a while to open the new PR. The problem with my approach is that it can potentially read more records when restoring. Essentially what we need to do, is that at _some_ point before the first snapshot is taken, we need to actually look up the offsets for empty partitions and replace any kind of marker. Your PR does this very early on the JMs, and I'm trying to figure out if there's any way to postpone this as much as possible to the TMs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33033][olap][haservice] Add haservice micro benchmark for olap [flink-benchmarks]
XComp commented on code in PR #78: URL: https://github.com/apache/flink-benchmarks/pull/78#discussion_r1353047609 ## src/main/java/org/apache/flink/olap/benchmark/HighAvailabilityServiceBenchmark.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.olap.benchmark; + +import org.apache.curator.test.TestingServer; +import org.apache.flink.api.common.JobID; +import org.apache.flink.benchmark.BenchmarkBase; +import org.apache.flink.benchmark.FlinkEnvironmentContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.FileUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.UUID; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.openjdk.jmh.annotations.Scope.Thread; + +/** The benchmark for submitting short-lived jobs with and without high availability service. */ +@OutputTimeUnit(SECONDS) +public class HighAvailabilityServiceBenchmark extends BenchmarkBase { + public static void main(String[] args) throws RunnerException { + Options options = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + HighAvailabilityServiceBenchmark.class.getCanonicalName() + ".*") + .build(); + + new Runner(options).run(); + } + + @Benchmark + public void submitJobThroughput(HighAvailabilityContext context) throws Exception { + context.miniCluster.executeJobBlocking(buildNoOpJob()); + } + + private JobGraph buildNoOpJob() { + JobGraph jobGraph = new JobGraph(JobID.generate(), UUID.randomUUID().toString()); + jobGraph.addVertex(createNoOpVertex()); + return jobGraph; + } + + private JobVertex createNoOpVertex() { Review Comment: ```suggestion private static JobVertex createNoOpVertex() { ``` nit: the create methods could be static ## pom.xml: ## @@ -63,6 +63,7 @@ under the License. 0.7.6 0.13.0 3.21.7 + 5.4.0 Review Comment: I guess we have to keep the curator version here aligned with the Flink dependency. Do we have to set it explicitlly or is there a way to use Flink's dependency transitively? :thinking: We might want to add a comment to the corresponding line in [apache/flink:pom.xml:144](https://github.com/apache/flink/blob/ec6ebe2d22d15883f7236895387a45a533cfefe0/pom.xml#L144) to point out that updating the curator version in Flink should result in updating the corresponding version in Flink's benchmark tests as well. ## src/main/java/org/apache/flink/olap/benchmark/HighAvailabilityServiceBenchmark.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by
[jira] [Commented] (FLINK-32806) EmbeddedJobResultStore keeps the non-dirty job entries forever
[ https://issues.apache.org/jira/browse/FLINK-32806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773758#comment-17773758 ] Samrat Deb commented on FLINK-32806: [~hk__lrzy] are you working on the patch ? > EmbeddedJobResultStore keeps the non-dirty job entries forever > -- > > Key: FLINK-32806 > URL: https://issues.apache.org/jira/browse/FLINK-32806 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.17.1, 1.19.0 >Reporter: Matthias Pohl >Assignee: hk__lrzy >Priority: Major > Labels: stale-assigned, starter > > The {{EmbeddedJobResultStore}} keeps the entries of cleaned-up jobs in-memory > forever. We might want to add a TTL to have those entries be removed after a > certain amount of time to allow maintaining the memory footprint of the > {{EmbeddedJobResultStore}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Backport Jetty's fix for java version parsing [flink-connector-hbase]
snuyanzin commented on PR #25: URL: https://github.com/apache/flink-connector-hbase/pull/25#issuecomment-1755768586 @MartijnVisser this should fix failing builds could you please have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773739#comment-17773739 ] Tzu-Li (Gordon) Tai commented on FLINK-33231: - Making this a blocker for the upcoming Kafka connector releases. > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Priority: Blocker > Fix For: kafka-3.0.1, kafka-3.1.0 > > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} > > > The offsetsToCommit map is cleaned from entries once they have been committed > to Kafka which happens during the callback function that is passed to the > KafkaSourceFetcherManager.commitOffsets method in > KafkaSourceReader.notifyCheckpointComplete method. > However if the committedPartitions is empty for the given checkpoint, then > the KafkaSourceFetcherManager.commitOffsets method returns. > [KafkaSourceFetcherManager line >
[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-33231: Fix Version/s: kafka-3.0.1 kafka-3.1.0 > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Priority: Blocker > Fix For: kafka-3.0.1, kafka-3.1.0 > > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} > > > The offsetsToCommit map is cleaned from entries once they have been committed > to Kafka which happens during the callback function that is passed to the > KafkaSourceFetcherManager.commitOffsets method in > KafkaSourceReader.notifyCheckpointComplete method. > However if the committedPartitions is empty for the given checkpoint, then > the KafkaSourceFetcherManager.commitOffsets method returns. > [KafkaSourceFetcherManager line > 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78] >
[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-33231: Priority: Blocker (was: Major) > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Priority: Blocker > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} > > > The offsetsToCommit map is cleaned from entries once they have been committed > to Kafka which happens during the callback function that is passed to the > KafkaSourceFetcherManager.commitOffsets method in > KafkaSourceReader.notifyCheckpointComplete method. > However if the committedPartitions is empty for the given checkpoint, then > the KafkaSourceFetcherManager.commitOffsets method returns. > [KafkaSourceFetcherManager line > 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78] > {code:java} > if (offsetsToCommit.isEmpty()) { > return; > } {code} > We can
[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773738#comment-17773738 ] Tzu-Li (Gordon) Tai commented on FLINK-33231: - [~lauri.suurvali] great debugging! I think the fix is basically, in KafkaSourceFetcherManager#commitOffsets, if the provided offsetsToCommitMap is empty, the callback (where the logic for truncating the map) should be used as well. Currently, it just returns without calling the callback at all. Code link: https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78-L80 > Memory leak in KafkaSourceReader if no data in consumed topic > - > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: Lauri Suurväli >Priority: Major > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} > > > The offsetsToCommit map is cleaned from entries once they have been committed > to Kafka which happens during the callback function that is passed to the > KafkaSourceFetcherManager.commitOffsets method in >
[PR] Backport Jetty's fix for java version parsing [flink-connector-hbase]
snuyanzin opened a new pull request, #25: URL: https://github.com/apache/flink-connector-hbase/pull/25 Merge fix for jetty https://github.com/eclipse/jetty.project/pull/2331 Since it is in Jetty 9.6 and hbase depends on 9.3 and lower -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33052) codespeed and benchmark server is down
[ https://issues.apache.org/jira/browse/FLINK-33052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773726#comment-17773726 ] Martijn Visser commented on FLINK-33052: [~Zakelly] Is there any update on this ticket? > codespeed and benchmark server is down > -- > > Key: FLINK-33052 > URL: https://issues.apache.org/jira/browse/FLINK-33052 > Project: Flink > Issue Type: Bug > Components: Benchmarks, Test Infrastructure >Affects Versions: 1.18.0 >Reporter: Jing Ge >Assignee: Zakelly Lan >Priority: Blocker > > No update in #flink-dev-benchmarks slack channel since 25th August. > It was a EC2 running in a legacy aws account. Currently on one knows which > account it is. > > https://apache-flink.slack.com/archives/C0471S0DFJ9/p1693932155128359 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32038) OffsetCommitMode.Kafka_periodic with checkpointing enabled
[ https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773719#comment-17773719 ] Martijn Visser commented on FLINK-32038: [~pritam.agarwala] [~tzulitai] What is the conclusion of this ticket? > OffsetCommitMode.Kafka_periodic with checkpointing enabled > --- > > Key: FLINK-32038 > URL: https://issues.apache.org/jira/browse/FLINK-32038 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Runtime / Checkpointing >Affects Versions: 1.14.6 >Reporter: Pritam Agarwala >Priority: Major > > I need to get kafka-lag to prepare a graph and its dependent on kafka > committed offset. Flink is updating the offsets only after checkpointing to > make it consistent. > Default Behaviour as per doc : > If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set > to false, then offset will not be committed at all even if the > {{enable.auto.commit}} is set to true. > So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't > it fall back on the {{enable.auto.commit}} to do offset commit regularly > since* *in any case flink doesn't use consumer committed offsets for > recovery.* > > OffsetCommitModes class : > > {code:java} > public class OffsetCommitModes { > /** > * Determine the offset commit mode using several configuration values. > * > * @param enableAutoCommit whether or not auto committing is enabled in > the provided Kafka > * properties. > * @param enableCommitOnCheckpoint whether or not committing on > checkpoints is enabled. > * @param enableCheckpointing whether or not checkpoint is enabled for > the consumer. > * @return the offset commit mode to use, based on the configuration > values. > */ > public static OffsetCommitMode fromConfiguration( > boolean enableAutoCommit, > boolean enableCommitOnCheckpoint, > boolean enableCheckpointing) { > if (enableCheckpointing) { > // if checkpointing is enabled, the mode depends only on whether > committing on > // checkpoints is enabled > return (enableCommitOnCheckpoint) > ? OffsetCommitMode.ON_CHECKPOINTS > : OffsetCommitMode.DISABLED; > } else { > // else, the mode depends only on whether auto committing is > enabled in the provided > // Kafka properties > return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : > OffsetCommitMode.DISABLED; > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors
[ https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773708#comment-17773708 ] Martijn Visser commented on FLINK-30400: [~chesnay] How would this work for SQL FAT jars? IIUC they actually need connector-base > Stop bundling connector-base in externalized connectors > --- > > Key: FLINK-30400 > URL: https://issues.apache.org/jira/browse/FLINK-30400 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Chesnay Schepler >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > Check that none of the externalized connectors bundle connector-base; if so > remove the bundling and schedule a new minor release. > Bundling this module is highly problematic w.r.t. binary compatibility, since > bundled classes may rely on internal APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33234) Bump used Guava version in Kafka E2E tests
Martijn Visser created FLINK-33234: -- Summary: Bump used Guava version in Kafka E2E tests Key: FLINK-33234 URL: https://issues.apache.org/jira/browse/FLINK-33234 Project: Flink Issue Type: Technical Debt Components: Connectors / Kafka Reporter: Martijn Visser Assignee: Martijn Visser To resolve existing Dependabot PRs: https://github.com/apache/flink-connector-kafka/security/dependabot?q=package%3Acom.google.guava%3Aguava+manifest%3Aflink-connector-kafka-e2e-tests%2Fflink-end-to-end-tests-common-kafka%2Fpom.xml+has%3Apatch -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-33182: Fix Version/s: 1.19.0 > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: lincoln lee >Priority: Major > Fix For: 1.19.0 > > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773700#comment-17773700 ] lincoln lee commented on FLINK-33182: - [~twalthr] Of course, I'll put it on my worklist for the next release. > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize
[ https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-33182: --- Assignee: lincoln lee > Allow metadata columns in NduAnalyzer with ChangelogNormalize > - > > Key: FLINK-33182 > URL: https://issues.apache.org/jira/browse/FLINK-33182 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: lincoln lee >Priority: Major > > Currently, the NduAnalyzer is very strict about metadata columns in updating > sources. However, for upsert sources (like Kafka) that contain an incomplete > changelog, the planner always adds a ChangelogNormalize node. > ChangelogNormalize will make sure that metadata columns can be considered > deterministic. So the NduAnalyzer should be satisfied in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33233][hive] Fix NPE when non-native udf used in join condition in hive-parser [flink]
flinkbot commented on PR #23504: URL: https://github.com/apache/flink/pull/23504#issuecomment-1755484549 ## CI report: * 49df8d49f9a7d30f7f3d26dc2f86f95a2c77d98e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33233) Null point exception when non-native udf used in join condition
[ https://issues.apache.org/jira/browse/FLINK-33233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33233: --- Labels: pull-request-available (was: ) > Null point exception when non-native udf used in join condition > --- > > Key: FLINK-33233 > URL: https://issues.apache.org/jira/browse/FLINK-33233 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: yunfan >Priority: Major > Labels: pull-request-available > > Any non-native udf used in hive-parser join condition. > It will caused NullPointException. > It can reproduced by follow code by adding this test to > {code:java} > org.apache.flink.connectors.hive.HiveDialectQueryITCase{code} > > {code:java} > // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase > @Test > public void testUdfInJoinCondition() throws Exception { > List result = CollectionUtil.iteratorToList(tableEnv.executeSql( > "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I > where bar.I > 1").collect()); > assertThat(result.toString()) > .isEqualTo("[+I[2, 2]]"); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33233) Null point exception when non-native udf used in join condition
[ https://issues.apache.org/jira/browse/FLINK-33233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yunfan updated FLINK-33233: --- Description: Any non-native udf used in hive-parser join condition. It will caused NullPointException. It can reproduced by follow code by adding this test to {code:java} org.apache.flink.connectors.hive.HiveDialectQueryITCase{code} {code:java} // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase @Test public void testUdfInJoinCondition() throws Exception { List result = CollectionUtil.iteratorToList(tableEnv.executeSql( "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I where bar.I > 1").collect()); assertThat(result.toString()) .isEqualTo("[+I[2, 2]]"); } {code} was: Any non-native udf used in hive-parser join condition. It will caused NullPointException. > Null point exception when non-native udf used in join condition > --- > > Key: FLINK-33233 > URL: https://issues.apache.org/jira/browse/FLINK-33233 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: yunfan >Priority: Major > > Any non-native udf used in hive-parser join condition. > It will caused NullPointException. > It can reproduced by follow code by adding this test to > {code:java} > org.apache.flink.connectors.hive.HiveDialectQueryITCase{code} > > {code:java} > // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase > @Test > public void testUdfInJoinCondition() throws Exception { > List result = CollectionUtil.iteratorToList(tableEnv.executeSql( > "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I > where bar.I > 1").collect()); > assertThat(result.toString()) > .isEqualTo("[+I[2, 2]]"); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33233][hive] Fix NPE when non-native udf used in join condition in hive-parser [flink]
yunfan123 opened a new pull request, #23504: URL: https://github.com/apache/flink/pull/23504 ## What is the purpose of the change Fix NPE when non-native udf used in join condition in hive-parser ## Brief change log This problem caused by not set UnparseTranslator when init HiveParserJoinTypeCheckCtx. ## Verifying this change This change is verified by added test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33233) Null point exception when non-native udf used in join condition
[ https://issues.apache.org/jira/browse/FLINK-33233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yunfan updated FLINK-33233: --- Environment: (was: It can reproduced by follow code by adding this test to {code:java} org.apache.flink.connectors.hive.HiveDialectQueryITCase{code} {code:java} // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase @Test public void testUdfInJoinCondition() throws Exception { List result = CollectionUtil.iteratorToList(tableEnv.executeSql( "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I where bar.I > 1").collect()); assertThat(result.toString()) .isEqualTo("[+I[2, 2]]"); } {code}) > Null point exception when non-native udf used in join condition > --- > > Key: FLINK-33233 > URL: https://issues.apache.org/jira/browse/FLINK-33233 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: yunfan >Priority: Major > > Any non-native udf used in hive-parser join condition. > It will caused NullPointException. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33233) Null point exception when non-native udf used in join condition
yunfan created FLINK-33233: -- Summary: Null point exception when non-native udf used in join condition Key: FLINK-33233 URL: https://issues.apache.org/jira/browse/FLINK-33233 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.17.0 Environment: It can reproduced by follow code by adding this test to {code:java} org.apache.flink.connectors.hive.HiveDialectQueryITCase{code} {code:java} // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase @Test public void testUdfInJoinCondition() throws Exception { List result = CollectionUtil.iteratorToList(tableEnv.executeSql( "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I where bar.I > 1").collect()); assertThat(result.toString()) .isEqualTo("[+I[2, 2]]"); } {code} Reporter: yunfan Any non-native udf used in hive-parser join condition. It will caused NullPointException. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33232) Kubernetive Operator Not Able to Take Other Python paramters While Submitting Job Deployment
[ https://issues.apache.org/jira/browse/FLINK-33232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amarjeet Singh updated FLINK-33232: --- Description: Flink Operator Is not Able to Read the Python Cmd like -pyFiles. While apply using Kubernetive on a Flink Session Cluster. The PyFiles are mounted using EFS, Not able to Read the EFS files and apply it > Kubernetive Operator Not Able to Take Other Python paramters While Submitting > Job Deployment > > > Key: FLINK-33232 > URL: https://issues.apache.org/jira/browse/FLINK-33232 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0 >Reporter: Amarjeet Singh >Priority: Major > Fix For: 1.17.1 > > > Flink Operator Is not Able to Read the Python Cmd like -pyFiles. > While apply using Kubernetive on a Flink Session Cluster. The PyFiles are > mounted using EFS, Not able to Read the EFS files and apply it -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33232) Kubernetive Operator Not Able to Take Other Python paramters While Submitting Job Deployment
Amarjeet Singh created FLINK-33232: -- Summary: Kubernetive Operator Not Able to Take Other Python paramters While Submitting Job Deployment Key: FLINK-33232 URL: https://issues.apache.org/jira/browse/FLINK-33232 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0 Reporter: Amarjeet Singh Fix For: 1.17.1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
Lauri Suurväli created FLINK-33231: -- Summary: Memory leak in KafkaSourceReader if no data in consumed topic Key: FLINK-33231 URL: https://issues.apache.org/jira/browse/FLINK-33231 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Reporter: Lauri Suurväli Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png *Problem description* Our Flink streaming job TaskManager heap gets full when the job has nothing to consume and process. It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. When there are no messages in the source topic the TaskManager heap usage starts increasing until the job exits after receiving a SIGTERM signal. We are running the job on AWS EMR with YARN. The problems with the TaskManager heap usage do not occur when there is data to process. It's also worth noting that sending a single message to the source topic of a streaming job that has been sitting idle and suffers from the memory leak will cause the heap to be cleared. However it does not resolve the problem since the heap usage will start increasing immediately after processing the message. !Screenshot 2023-10-10 at 12.49.37.png! TaskManager heap used percentage is calculated by {code:java} flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / flink.taskmanager.Status.JVM.Memory.Heap.Max{code} I was able to take heap dumps of the TaskManager processes during a high heap usage percentage. Heap dump analysis detected 912,355 instances of java.util.HashMap empty collections retaining >= 43,793,040 bytes. !Screenshot 2023-10-09 at 14.13.43.png! The retained heap seemed to be located at: {code:java} org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} !Screenshot 2023-10-09 at 13.02.34.png! *Possible hints:* An empty HashMap is added during the snapshotState method to offsetsToCommit map if it does not already exist for the given checkpoint. [KafkaSourceReader line 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] {code:java} Map offsetsMap = offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); {code} If the startingOffset for the given split is >= 0 then a new entry would be added to the map from the previous step. [KafkaSourceReader line 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] {code:java} if (split.getStartingOffset() >= 0) { offsetsMap.put( split.getTopicPartition(), new OffsetAndMetadata(split.getStartingOffset())); }{code} If the starting offset is smaller than 0 then this would leave the offsetMap created in step 1 empty. We can see from the logs that the startingOffset is -3 when the splits are added to the reader. {code:java} Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition: source-events-44, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-36, StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-28, StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} The offsetsToCommit map is cleaned from entries once they have been committed to Kafka which happens during the callback function that is passed to the KafkaSourceFetcherManager.commitOffsets method in KafkaSourceReader.notifyCheckpointComplete method. However if the committedPartitions is empty for the given checkpoint, then the KafkaSourceFetcherManager.commitOffsets method returns. [KafkaSourceFetcherManager line 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78] {code:java} if (offsetsToCommit.isEmpty()) { return; } {code} We can observe from the logs that indeed an empty map is encountered at this step: {code:java} Committing offsets {}{code} *Conclusion* It seems that an empty map gets added per each checkpoint to offsetsToCommit map. Since the startingOffset in our case is -3 then the empty map never gets filled. During the offset commit phase the offsets for these checkpoints are ignored, since there is nothing to
Re: [PR] Bump guava from 30.1.1-jre to 32.0.0-jre in /flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka [flink-connector-kafka]
MartijnVisser commented on PR #33: URL: https://github.com/apache/flink-connector-kafka/pull/33#issuecomment-1755427837 @dependabot rebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33149) Bump snappy-java to 1.1.10.4
[ https://issues.apache.org/jira/browse/FLINK-33149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-33149: --- Fix Version/s: kafka-3.1.0 > Bump snappy-java to 1.1.10.4 > > > Key: FLINK-33149 > URL: https://issues.apache.org/jira/browse/FLINK-33149 > Project: Flink > Issue Type: Bug > Components: API / Core, Connectors / AWS, Connectors / HBase, > Connectors / Kafka, Stateful Functions >Affects Versions: 1.18.0, 1.16.3, 1.17.2 >Reporter: Ryan Skraba >Assignee: Ryan Skraba >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, kafka-3.1.0, 1.16.3, 1.17.2, 1.19.0 > > > Xerial published a security alert for a Denial of Service attack that [exists > on > 1.1.10.1|https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv]. > This is included in flink-dist, but also in flink-statefun, and several > connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33149) Bump snappy-java to 1.1.10.4
[ https://issues.apache.org/jira/browse/FLINK-33149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773686#comment-17773686 ] Martijn Visser commented on FLINK-33149: Merged in: apache/flink-connector-kafka:main 73f761fa73d4200d18f628eef7c79cf91dd1a0bc > Bump snappy-java to 1.1.10.4 > > > Key: FLINK-33149 > URL: https://issues.apache.org/jira/browse/FLINK-33149 > Project: Flink > Issue Type: Bug > Components: API / Core, Connectors / AWS, Connectors / HBase, > Connectors / Kafka, Stateful Functions >Affects Versions: 1.18.0, 1.16.3, 1.17.2 >Reporter: Ryan Skraba >Assignee: Ryan Skraba >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, kafka-3.1.0, 1.16.3, 1.17.2, 1.19.0 > > > Xerial published a security alert for a Denial of Service attack that [exists > on > 1.1.10.1|https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv]. > This is included in flink-dist, but also in flink-statefun, and several > connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33149] Bump snappy-java from 1.1.8.3 to 1.1.10.5 [flink-connector-kafka]
MartijnVisser merged PR #34: URL: https://github.com/apache/flink-connector-kafka/pull/34 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33228][flink-runtime] Fix the total current resource calculation when fulfilling requirement [flink]
KarmaGYZ commented on PR #23502: URL: https://github.com/apache/flink/pull/23502#issuecomment-1755330741 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32563] Add disable-archunit-tests profile [flink-connector-shared-utils]
echauchot commented on PR #21: URL: https://github.com/apache/flink-connector-shared-utils/pull/21#issuecomment-1755288912 I submitted against main instead of parent-pom branch closing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32563] Add disable-archunit-tests profile [flink-connector-shared-utils]
echauchot closed pull request #21: [FLINK-32563] Add disable-archunit-tests profile URL: https://github.com/apache/flink-connector-shared-utils/pull/21 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32563) execute sanity checks only with Flink version that connectors were built against
[ https://issues.apache.org/jira/browse/FLINK-32563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32563: --- Labels: pull-request-available (was: ) > execute sanity checks only with Flink version that connectors were built > against > > > Key: FLINK-32563 > URL: https://issues.apache.org/jira/browse/FLINK-32563 > Project: Flink > Issue Type: Technical Debt > Components: Build System / CI >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > Labels: pull-request-available > > As part of [this > discussion|https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3] > , the need for connectors to specify the main flink version that a connector > supports has arisen. > This CI variable will allow to configure the build and tests differently > depending on this version. This parameter would be optional. > The first use case is to run archunit tests only on the main supported > version as discussed in the above thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352333625 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler< protected final EventRecorder eventRecorder; protected final StatusRecorder statusRecorder; -protected final JobAutoScaler resourceScaler; +protected final JobAutoScaler resourceScaler; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33229][table-planner] Moving Java class FlinkRecomputeStatisticsProgram from scala package to java package [flink]
lincoln-lil merged PR #23503: URL: https://github.com/apache/flink/pull/23503 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33229][table-planner] Moving Java class FlinkRecomputeStatisticsProgram from scala package to java package [flink]
lincoln-lil commented on PR #23503: URL: https://github.com/apache/flink/pull/23503#issuecomment-1755222956 It's a reasonable move. +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI
[ https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-33230: Assignee: Yu Chen > Support Expanding ExecutionGraph to StreamGraph in Web UI > - > > Key: FLINK-33230 > URL: https://issues.apache.org/jira/browse/FLINK-33230 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Yu Chen >Assignee: Yu Chen >Priority: Major > Attachments: image-2023-10-10-18-52-38-252.png > > > Flink Web shows users the ExecutionGraph (i.e., chained operators), but in > some cases, we would like to know the structure of the chained operators as > well as the necessary metrics such as the inputs and outputs of data, etc. > > Thus, we propose to show the stream graphs and some related metrics such as > numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). > > !image-2023-10-10-18-52-38-252.png|width=750,height=263! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][connectors/mongodb] Fix typo and optimize log [flink-connector-mongodb]
Jiabao-Sun commented on PR #16: URL: https://github.com/apache/flink-connector-mongodb/pull/16#issuecomment-1755190195 Hi @hlteoh37, could you help review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33228][flink-runtime] Fix the total current resource calculation when fulfilling requirement [flink]
KarmaGYZ commented on PR #23502: URL: https://github.com/apache/flink/pull/23502#issuecomment-1755184939 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352290023 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } +private void scaling(FlinkResourceContext ctx) throws Exception { +KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext(); + +if (autoscalerDisabled(ctx)) { +autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false); +resourceScaler.scale(autoScalerContext); +return; +} +if (waitingForRunning(ctx.getResource().getStatus())) { +LOG.info("Autoscaler is waiting for stable, running state"); +resourceScaler.cleanup(autoScalerContext.getJobKey()); +return; Review Comment: Preferably, I would like any logic related to applying parallelism inside the autoscaler implementation. This shouldn't change when the autoscaler is waiting for the running state. In fact, the job state checks should also be performed by the autoscaler, not by the reconciler. The current code mixes control over the parallelism overrides between the reconciler and the autoscaler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
mxm commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352282612 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler< protected final EventRecorder eventRecorder; protected final StatusRecorder statusRecorder; -protected final JobAutoScaler resourceScaler; +protected final JobAutoScaler resourceScaler; Review Comment: Can we rename this? ```suggestion protected final JobAutoScaler autoscaler; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1351975279 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -15,149 +15,180 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.config; +package org.apache.flink.autoscaler.config; +import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator; import java.time.Duration; import java.util.List; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig; - /** Config options related to the autoscaler module. */ public class AutoScalerOptions { +public static final String DEPRECATED_K8S_OP_CONF_PREFIX = "kubernetes.operator."; +public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler."; + +private static String deprecatedOperatorConfigKey(String key) { +return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key; +} + +private static String autoScalerConfigKey(String key) { +return AUTOSCALER_CONF_PREFIX + key; +} + private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { -return operatorConfig("job.autoscaler." + key); +return ConfigOptions.key(autoScalerConfigKey(key)); } public static final ConfigOption AUTOSCALER_ENABLED = autoScalerConfig("enabled") .booleanType() .defaultValue(false) +.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled")) Review Comment: Added: > Note: The option prefix `kubernetes.operator.` was removed in FLIP-334, because the autoscaler module was decoupled from flink-kubernetes-operator. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.realizer.ScalingRealizer; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; +import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; + +/** The default implementation of {@link JobAutoScaler}. */ +public class JobAutoScalerImpl> +implements JobAutoScaler { + +private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class); + +@VisibleForTesting protected static final String AUTOSCALER_ERROR = "AutoscalerError"; Review Comment: It's used at `BacklogBasedScalingTest`. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with +
[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI
[ https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Chen updated FLINK-33230: Attachment: (was: image-2023-10-10-18-45-24-486.png) > Support Expanding ExecutionGraph to StreamGraph in Web UI > - > > Key: FLINK-33230 > URL: https://issues.apache.org/jira/browse/FLINK-33230 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Yu Chen >Priority: Major > Attachments: image-2023-10-10-18-52-38-252.png > > > Flink Web shows users the ExecutionGraph (i.e., chained operators), but in > some cases, we would like to know the structure of the chained operators as > well as the necessary metrics such as the inputs and outputs of data, etc. > > Thus, we propose to show the stream graphs and some related metrics such as > numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). > > !image-2023-10-10-18-52-38-252.png|width=750,height=263! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI
[ https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Chen updated FLINK-33230: Description: Flink Web shows users the ExecutionGraph (i.e., chained operators), but in some cases, we would like to know the structure of the chained operators as well as the necessary metrics such as the inputs and outputs of data, etc. Thus, we propose to show the stream graphs and some related metrics such as numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). !image-2023-10-10-18-52-38-252.png|width=750,height=263! was: Flink Web shows users the ExecutionGraph (i.e., chained operators), but in some cases, we would like to know the structure of the chained operators as well as the necessary metrics such as the inputs and outputs of data, etc. Thus, we propose to show the stream graphs and some related metrics such as numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). !image-2023-10-10-18-45-42-991.png|width=508,height=178! > Support Expanding ExecutionGraph to StreamGraph in Web UI > - > > Key: FLINK-33230 > URL: https://issues.apache.org/jira/browse/FLINK-33230 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Yu Chen >Priority: Major > Attachments: image-2023-10-10-18-45-24-486.png, > image-2023-10-10-18-52-38-252.png > > > Flink Web shows users the ExecutionGraph (i.e., chained operators), but in > some cases, we would like to know the structure of the chained operators as > well as the necessary metrics such as the inputs and outputs of data, etc. > > Thus, we propose to show the stream graphs and some related metrics such as > numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). > > !image-2023-10-10-18-52-38-252.png|width=750,height=263! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI
[ https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Chen updated FLINK-33230: Attachment: image-2023-10-10-18-52-38-252.png > Support Expanding ExecutionGraph to StreamGraph in Web UI > - > > Key: FLINK-33230 > URL: https://issues.apache.org/jira/browse/FLINK-33230 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Yu Chen >Priority: Major > Attachments: image-2023-10-10-18-45-24-486.png, > image-2023-10-10-18-52-38-252.png > > > Flink Web shows users the ExecutionGraph (i.e., chained operators), but in > some cases, we would like to know the structure of the chained operators as > well as the necessary metrics such as the inputs and outputs of data, etc. > > Thus, we propose to show the stream graphs and some related metrics such as > numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). > > !image-2023-10-10-18-45-42-991.png|width=508,height=178! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI
[ https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Chen updated FLINK-33230: Attachment: image-2023-10-10-18-45-24-486.png Component/s: Runtime / Web Frontend Affects Version/s: 1.19.0 Description: Flink Web shows users the ExecutionGraph (i.e., chained operators), but in some cases, we would like to know the structure of the chained operators as well as the necessary metrics such as the inputs and outputs of data, etc. Thus, we propose to show the stream graphs and some related metrics such as numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). !image-2023-10-10-18-45-42-991.png|width=508,height=178! Summary: Support Expanding ExecutionGraph to StreamGraph in Web UI (was: Support Expanding ExecutionGraph to StreamGraph in Flink) > Support Expanding ExecutionGraph to StreamGraph in Web UI > - > > Key: FLINK-33230 > URL: https://issues.apache.org/jira/browse/FLINK-33230 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 >Reporter: Yu Chen >Priority: Major > Attachments: image-2023-10-10-18-45-24-486.png, > image-2023-10-10-18-52-38-252.png > > > Flink Web shows users the ExecutionGraph (i.e., chained operators), but in > some cases, we would like to know the structure of the chained operators as > well as the necessary metrics such as the inputs and outputs of data, etc. > > Thus, we propose to show the stream graphs and some related metrics such as > numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure). > > !image-2023-10-10-18-45-42-991.png|width=508,height=178! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-15462) Introduce TrinoSqlDialect
[ https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-15462. - Fix Version/s: jdbc-3.2.0 Resolution: Fixed > Introduce TrinoSqlDialect > - > > Key: FLINK-15462 > URL: https://issues.apache.org/jira/browse/FLINK-15462 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: li yu >Assignee: João Boto >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Fix For: jdbc-3.2.0 > > > flink-jdbc support > Derby, Mysql, Postgre > [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java] > Could we add support for prestosql ? > Link to prestosql jdbc [https://prestosql.io/download.html] > Advantage is presto support a variety of data source (i.e we could > ingest/load data to or from those data source just through presto jdbc) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-15462) Introduce TrinoSqlDialect
[ https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773641#comment-17773641 ] Sergey Nuyanzin commented on FLINK-15462: - Merged as [8e0496a5a9727087b38c2fc412a397a232ee0f5f|https://github.com/apache/flink-connector-jdbc/commit/8e0496a5a9727087b38c2fc412a397a232ee0f5f] > Introduce TrinoSqlDialect > - > > Key: FLINK-15462 > URL: https://issues.apache.org/jira/browse/FLINK-15462 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: li yu >Assignee: João Boto >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > flink-jdbc support > Derby, Mysql, Postgre > [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java] > Could we add support for prestosql ? > Link to prestosql jdbc [https://prestosql.io/download.html] > Advantage is presto support a variety of data source (i.e we could > ingest/load data to or from those data source just through presto jdbc) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-15462][connectors] Add Trino dialect [flink-connector-jdbc]
snuyanzin merged PR #3: URL: https://github.com/apache/flink-connector-jdbc/pull/3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-15462) Introduce TrinoSqlDialect
[ https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-15462: --- Assignee: Joan Schipper > Introduce TrinoSqlDialect > - > > Key: FLINK-15462 > URL: https://issues.apache.org/jira/browse/FLINK-15462 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: li yu >Assignee: Joan Schipper >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > flink-jdbc support > Derby, Mysql, Postgre > [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java] > Could we add support for prestosql ? > Link to prestosql jdbc [https://prestosql.io/download.html] > Advantage is presto support a variety of data source (i.e we could > ingest/load data to or from those data source just through presto jdbc) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-15462) Introduce TrinoSqlDialect
[ https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-15462: Summary: Introduce TrinoSqlDialect (was: Introduce PrestoSqlDialect) > Introduce TrinoSqlDialect > - > > Key: FLINK-15462 > URL: https://issues.apache.org/jira/browse/FLINK-15462 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: li yu >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > flink-jdbc support > Derby, Mysql, Postgre > [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java] > Could we add support for prestosql ? > Link to prestosql jdbc [https://prestosql.io/download.html] > Advantage is presto support a variety of data source (i.e we could > ingest/load data to or from those data source just through presto jdbc) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-15462) Introduce TrinoSqlDialect
[ https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-15462: --- Assignee: João Boto (was: Joan Schipper) > Introduce TrinoSqlDialect > - > > Key: FLINK-15462 > URL: https://issues.apache.org/jira/browse/FLINK-15462 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: li yu >Assignee: João Boto >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > flink-jdbc support > Derby, Mysql, Postgre > [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java] > Could we add support for prestosql ? > Link to prestosql jdbc [https://prestosql.io/download.html] > Advantage is presto support a variety of data source (i.e we could > ingest/load data to or from those data source just through presto jdbc) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Flink
Yu Chen created FLINK-33230: --- Summary: Support Expanding ExecutionGraph to StreamGraph in Flink Key: FLINK-33230 URL: https://issues.apache.org/jira/browse/FLINK-33230 Project: Flink Issue Type: Improvement Reporter: Yu Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]
mxm commented on PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1754958456 Hey Mason! Do you want a review for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org