Re: [PR] [FLINK-33236][config] Remove the unused high-availability.zookeeper.pth.running-registry option [flink]

2023-10-10 Thread via GitHub


flinkbot commented on PR #23506:
URL: https://github.com/apache/flink/pull/23506#issuecomment-1756844304

   
   ## CI report:
   
   * 72b6b3a52f9efd6980941259a12281bb40a78976 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option

2023-10-10 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773900#comment-17773900
 ] 

Zhanghao Chen commented on FLINK-33236:
---

Hi [~mapohl], I've prepared a quick cleanup PR for it. Could you help take a 
review when you are free?

> Remove the unused high-availability.zookeeper.path.running-registry option
> --
>
> Key: FLINK-33236
> URL: https://issues.apache.org/jira/browse/FLINK-33236
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Configuration
>Affects Versions: 1.18.0
>Reporter: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The running registry subcomponent of Flink HA has been removed in 
> [FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the 
> "high-availability.zookeeper.path.running-registry" option is of no use after 
> that. We should remove the option and regenerate the config doc to remove the 
> relevant descriptions to avoid user's confusion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option

2023-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33236:
---
Labels: pull-request-available  (was: )

> Remove the unused high-availability.zookeeper.path.running-registry option
> --
>
> Key: FLINK-33236
> URL: https://issues.apache.org/jira/browse/FLINK-33236
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Configuration
>Affects Versions: 1.18.0
>Reporter: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The running registry subcomponent of Flink HA has been removed in 
> [FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the 
> "high-availability.zookeeper.path.running-registry" option is of no use after 
> that. We should remove the option and regenerate the config doc to remove the 
> relevant descriptions to avoid user's confusion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33236][config] Remove the unused high-availability.zookeeper.pth.running-registry option [flink]

2023-10-10 Thread via GitHub


X-czh opened a new pull request, #23506:
URL: https://github.com/apache/flink/pull/23506

   
   
   ## What is the purpose of the change
   
   The running registry subcomponent of Flink HA has been removed in 
[FLINK-25430](https://issues.apache.org/jira/browse/FLINK-25430) and the 
"high-availability.zookeeper.path.running-registry" option is of no use after 
that. We should remove the option and regenerate the config doc to remove the 
relevant descriptions to avoid user's confusion.
   
   ## Brief change log
   
   Remove the unused high-availability.zookeeper.pth.running-registry option 
and the relevant doc.
   
   ## Verifying this change
   
   This change is a trivial code cleanup.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option

2023-10-10 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33236:
-

 Summary: Remove the unused 
high-availability.zookeeper.path.running-registry option
 Key: FLINK-33236
 URL: https://issues.apache.org/jira/browse/FLINK-33236
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Configuration
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0


The running registry subcomponent of Flink HA has been removed in 
[FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the 
"high-availability.zookeeper.path.running-registry" option is of no use after 
that. We should remove the option and regenerate the config doc to remove the 
relevant descriptions to avoid user's confusion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33209) Translate Flink OLAP quick start guide to Chinese

2023-10-10 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo reassigned FLINK-33209:
--

Assignee: xiangyu feng

> Translate Flink OLAP quick start guide to Chinese
> -
>
> Key: FLINK-33209
> URL: https://issues.apache.org/jira/browse/FLINK-33209
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>
> Translate Flink OLAP quick start guide to Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33235) Quickstart guide for Flink OLAP should support building from master branch

2023-10-10 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo reassigned FLINK-33235:
--

Assignee: xiangyu feng

> Quickstart guide for Flink OLAP should support building from master branch
> --
>
> Key: FLINK-33235
> URL: https://issues.apache.org/jira/browse/FLINK-33235
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>
> Many features required by OLAP session cluster are still in master branch or 
> in-progress and not released yet, for example: FLIP-295, FLIP-362, FLIP-374. 
> We need to address this in the document and show users how to quickly build 
> OLAP Session Cluster from master branch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33235) Quickstart guide for Flink OLAP should support building from master branch

2023-10-10 Thread xiangyu feng (Jira)
xiangyu feng created FLINK-33235:


 Summary: Quickstart guide for Flink OLAP should support building 
from master branch
 Key: FLINK-33235
 URL: https://issues.apache.org/jira/browse/FLINK-33235
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: xiangyu feng


Many features required by OLAP session cluster are still in master branch or 
in-progress and not released yet, for example: FLIP-295, FLIP-362, FLIP-374. We 
need to address this in the document and show users how to quickly build OLAP 
Session Cluster from master branch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands

2023-10-10 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773896#comment-17773896
 ] 

luoyuxia commented on FLINK-33168:
--

I believe the doc has been updated in FLINK-32982. And the release note will 
also include it.

> An error occurred when executing sql, java.lang.NoSuchFieldError: operands
> --
>
> Key: FLINK-33168
> URL: https://issues.apache.org/jira/browse/FLINK-33168
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: macdoor615
>Assignee: Zheng yunhong
>Priority: Major
>
> Environment:
>  
> {code:java}
> Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 
> 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux
> openjdk version "1.8.0_382"
> OpenJDK Runtime Environment (build 1.8.0_382-b05)
> OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
> flink-1.18.0-RC1 , 
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc1
> {code}
>  
> I execute the following sql in sql-client.sh.
>  
> {code:java}
> insert into svc1_paimon_prod.cq.b_customer_ecus
> select
>   rcus.id id,
>   if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id,    
>   if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) 
> newCus_id,
>   companyID,
>   customerProvinceNumber,
>   mobilePhone,
>   oprCode,
>   customerNum,
>   staffName,
>   location,
>   staffNumber,
>   extendInfo,
>   customerName,
>   case when companyID='000' then '名称1'
>        when companyID='002' then '名称2'
>        else '新名称'
>        end prov,
>   row (
>     accessToken,
>     busType,
>     cutOffDay,
>     domain,
>     envFlag,
>     routeType,
>     routeValue,
>     sessionID,
>     sign,
>     signMethod,
>     org_timeStamp,
>     transIDO,
>     userPartyID,
>     version
>   ) raw_message,
>   named_struct(
>     'id', cus.id,
>     'name', cus.name,
>     'code', cus.code,
>     'customerlevel', cus.customerlevel,
>     'prov', cus.prov,
>     'container', cus.container,
>     'crtime', cus.crtime,
>     'updtime', cus.updtime
>   ) existing_cus,
>   cus_rownum,
>   to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp,
>   raw_rowtime,
>   localtimestamp as raw_rowtime1,
>   dt
> from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq
>   /*+ OPTIONS('consumer-id' = '创建新客户id') */
>   rcus
> left join svc1_mysql_test.gem_svc1_vpn.bv_customer
> FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code
> {code}
> There are the following jar files in the flink/lib directory.
> {code:java}
> commons-cli-1.5.0.jar
> flink-cep-1.18.0.jar
> flink-connector-files-1.18.0.jar
> flink-connector-jdbc-3.1.1-1.17.jar
> flink-csv-1.18.0.jar
> flink-dist-1.18.0.jar
> flink-json-1.18.0.jar
> flink-orc-1.18.0.jar
> flink-parquet-1.18.0.jar
> flink-scala_2.12-1.18.0.jar
> flink-sql-avro-1.18.0.jar
> flink-sql-avro-confluent-registry-1.18.0.jar
> flink-sql-connector-elasticsearch7-3.0.0-1.16.jar
> flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar
> flink-sql-connector-kafka-3.0.0-1.17.jar
> flink-sql-orc-1.18.0.jar
> flink-sql-parquet-1.18.0.jar
> flink-table-api-java-uber-1.18.0.jar
> flink-table-api-scala_2.12-1.18.0.jar
> flink-table-api-scala-bridge_2.12-1.18.0.jar
> flink-table-planner_2.12-1.18.0.jar
> flink-table-runtime-1.18.0.jar
> jline-reader-3.23.0.jar
> jline-terminal-3.23.0.jar
> kafka-clients-3.5.1.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> mysql-connector-j-8.1.0.jar
> paimon-flink-1.18-0.6-20230929.002044-11.jar{code}
> Works correctly in version 1.17.1, but produces the following error in 
> 1.18.0-RC1
>  
> {code:java}
> 2023-09-29 14:04:11,438 ERROR 
> org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed 
> to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da.
> java.lang.NoSuchFieldError: operands
> at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) 
> ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> 

Re: [PR] [FLINK-33171][table planner] Table SQL support Not Equal for TimePoint type and TimeString [flink]

2023-10-10 Thread via GitHub


lincoln-lil commented on code in PR #23478:
URL: https://github.com/apache/flink/pull/23478#discussion_r1353989534


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala:
##
@@ -488,6 +488,18 @@ object ScalarOperatorGens {
 else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
   generateComparison(ctx, "!=", left, right, resultType)
 }
+// support date/time/timestamp not equalTo string.
+else if (
+  (isTimePoint(left.resultType) && isCharacterString(right.resultType)) ||

Review Comment:
   Can we add optimizations for string literals similar to the handling of 
`generateEquals`?



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala:
##
@@ -228,6 +228,15 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 testSqlApi("uuid() = cast(f22 as timestamp_ltz)", "NULL")
   }
 
+  @Test
+  def testTimePointTypeNotEqualsStringType(): Unit = {
+testSqlApi("f22 = '1996-11-10 12:34:56'", "TRUE")

Review Comment:
   I see that the datetime family contains 5 datatypes(see 
`org.apache.flink.table.types.logical.LogicalTypeRoot`), so it is recommended 
to add more tests for complete coverage of the datatypes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands

2023-10-10 Thread macdoor615 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

macdoor615 closed FLINK-33168.
--

> An error occurred when executing sql, java.lang.NoSuchFieldError: operands
> --
>
> Key: FLINK-33168
> URL: https://issues.apache.org/jira/browse/FLINK-33168
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: macdoor615
>Assignee: Zheng yunhong
>Priority: Major
>
> Environment:
>  
> {code:java}
> Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 
> 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux
> openjdk version "1.8.0_382"
> OpenJDK Runtime Environment (build 1.8.0_382-b05)
> OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
> flink-1.18.0-RC1 , 
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc1
> {code}
>  
> I execute the following sql in sql-client.sh.
>  
> {code:java}
> insert into svc1_paimon_prod.cq.b_customer_ecus
> select
>   rcus.id id,
>   if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id,    
>   if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) 
> newCus_id,
>   companyID,
>   customerProvinceNumber,
>   mobilePhone,
>   oprCode,
>   customerNum,
>   staffName,
>   location,
>   staffNumber,
>   extendInfo,
>   customerName,
>   case when companyID='000' then '名称1'
>        when companyID='002' then '名称2'
>        else '新名称'
>        end prov,
>   row (
>     accessToken,
>     busType,
>     cutOffDay,
>     domain,
>     envFlag,
>     routeType,
>     routeValue,
>     sessionID,
>     sign,
>     signMethod,
>     org_timeStamp,
>     transIDO,
>     userPartyID,
>     version
>   ) raw_message,
>   named_struct(
>     'id', cus.id,
>     'name', cus.name,
>     'code', cus.code,
>     'customerlevel', cus.customerlevel,
>     'prov', cus.prov,
>     'container', cus.container,
>     'crtime', cus.crtime,
>     'updtime', cus.updtime
>   ) existing_cus,
>   cus_rownum,
>   to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp,
>   raw_rowtime,
>   localtimestamp as raw_rowtime1,
>   dt
> from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq
>   /*+ OPTIONS('consumer-id' = '创建新客户id') */
>   rcus
> left join svc1_mysql_test.gem_svc1_vpn.bv_customer
> FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code
> {code}
> There are the following jar files in the flink/lib directory.
> {code:java}
> commons-cli-1.5.0.jar
> flink-cep-1.18.0.jar
> flink-connector-files-1.18.0.jar
> flink-connector-jdbc-3.1.1-1.17.jar
> flink-csv-1.18.0.jar
> flink-dist-1.18.0.jar
> flink-json-1.18.0.jar
> flink-orc-1.18.0.jar
> flink-parquet-1.18.0.jar
> flink-scala_2.12-1.18.0.jar
> flink-sql-avro-1.18.0.jar
> flink-sql-avro-confluent-registry-1.18.0.jar
> flink-sql-connector-elasticsearch7-3.0.0-1.16.jar
> flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar
> flink-sql-connector-kafka-3.0.0-1.17.jar
> flink-sql-orc-1.18.0.jar
> flink-sql-parquet-1.18.0.jar
> flink-table-api-java-uber-1.18.0.jar
> flink-table-api-scala_2.12-1.18.0.jar
> flink-table-api-scala-bridge_2.12-1.18.0.jar
> flink-table-planner_2.12-1.18.0.jar
> flink-table-runtime-1.18.0.jar
> jline-reader-3.23.0.jar
> jline-terminal-3.23.0.jar
> kafka-clients-3.5.1.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> mysql-connector-j-8.1.0.jar
> paimon-flink-1.18-0.6-20230929.002044-11.jar{code}
> Works correctly in version 1.17.1, but produces the following error in 
> 1.18.0-RC1
>  
> {code:java}
> 2023-09-29 14:04:11,438 ERROR 
> org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed 
> to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da.
> java.lang.NoSuchFieldError: operands
> at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) 
> ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> 

[jira] [Commented] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands

2023-10-10 Thread macdoor615 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773893#comment-17773893
 ] 

macdoor615 commented on FLINK-33168:


[~luoyuxia]  table-planner-loader.jar works. Docs should be updated.

> An error occurred when executing sql, java.lang.NoSuchFieldError: operands
> --
>
> Key: FLINK-33168
> URL: https://issues.apache.org/jira/browse/FLINK-33168
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: macdoor615
>Assignee: Zheng yunhong
>Priority: Major
>
> Environment:
>  
> {code:java}
> Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 
> 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux
> openjdk version "1.8.0_382"
> OpenJDK Runtime Environment (build 1.8.0_382-b05)
> OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
> flink-1.18.0-RC1 , 
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc1
> {code}
>  
> I execute the following sql in sql-client.sh.
>  
> {code:java}
> insert into svc1_paimon_prod.cq.b_customer_ecus
> select
>   rcus.id id,
>   if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id,    
>   if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) 
> newCus_id,
>   companyID,
>   customerProvinceNumber,
>   mobilePhone,
>   oprCode,
>   customerNum,
>   staffName,
>   location,
>   staffNumber,
>   extendInfo,
>   customerName,
>   case when companyID='000' then '名称1'
>        when companyID='002' then '名称2'
>        else '新名称'
>        end prov,
>   row (
>     accessToken,
>     busType,
>     cutOffDay,
>     domain,
>     envFlag,
>     routeType,
>     routeValue,
>     sessionID,
>     sign,
>     signMethod,
>     org_timeStamp,
>     transIDO,
>     userPartyID,
>     version
>   ) raw_message,
>   named_struct(
>     'id', cus.id,
>     'name', cus.name,
>     'code', cus.code,
>     'customerlevel', cus.customerlevel,
>     'prov', cus.prov,
>     'container', cus.container,
>     'crtime', cus.crtime,
>     'updtime', cus.updtime
>   ) existing_cus,
>   cus_rownum,
>   to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp,
>   raw_rowtime,
>   localtimestamp as raw_rowtime1,
>   dt
> from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq
>   /*+ OPTIONS('consumer-id' = '创建新客户id') */
>   rcus
> left join svc1_mysql_test.gem_svc1_vpn.bv_customer
> FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code
> {code}
> There are the following jar files in the flink/lib directory.
> {code:java}
> commons-cli-1.5.0.jar
> flink-cep-1.18.0.jar
> flink-connector-files-1.18.0.jar
> flink-connector-jdbc-3.1.1-1.17.jar
> flink-csv-1.18.0.jar
> flink-dist-1.18.0.jar
> flink-json-1.18.0.jar
> flink-orc-1.18.0.jar
> flink-parquet-1.18.0.jar
> flink-scala_2.12-1.18.0.jar
> flink-sql-avro-1.18.0.jar
> flink-sql-avro-confluent-registry-1.18.0.jar
> flink-sql-connector-elasticsearch7-3.0.0-1.16.jar
> flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar
> flink-sql-connector-kafka-3.0.0-1.17.jar
> flink-sql-orc-1.18.0.jar
> flink-sql-parquet-1.18.0.jar
> flink-table-api-java-uber-1.18.0.jar
> flink-table-api-scala_2.12-1.18.0.jar
> flink-table-api-scala-bridge_2.12-1.18.0.jar
> flink-table-planner_2.12-1.18.0.jar
> flink-table-runtime-1.18.0.jar
> jline-reader-3.23.0.jar
> jline-terminal-3.23.0.jar
> kafka-clients-3.5.1.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> mysql-connector-j-8.1.0.jar
> paimon-flink-1.18-0.6-20230929.002044-11.jar{code}
> Works correctly in version 1.17.1, but produces the following error in 
> 1.18.0-RC1
>  
> {code:java}
> 2023-09-29 14:04:11,438 ERROR 
> org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed 
> to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da.
> java.lang.NoSuchFieldError: operands
> at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) 
> ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> 

[jira] [Comment Edited] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands

2023-10-10 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773891#comment-17773891
 ] 

luoyuxia edited comment on FLINK-33168 at 10/11/23 4:16 AM:


I tried with puting table-planner-loader.jar in /lib and it works. Since it's 
expected in Flink 1.18 as you can see in FLINK-31575, we recommend to not to 
swap table-planner-loader and table-planner, I would like to close it. Feel 
free to open it again when you still meet problems. 

The reason may be a a little of complex, we include 
{{org/apache/calcite/plan/RelOptRule.class}} in flink-sql-connector-hive 3.1.3, 
the reason can be seen in 
[here|https://github.com/apache/flink/blob/5269631af525a01d944cfa9994a116fb27b80b1b/flink-connectors/flink-sql-connector-hive-3.1.3/pom.xml#L198].
 
Then, the planner will load the class {{RelOptRule}} in 
flink-sql-connector-hive 3.1.3, but the class RelOptRule in complied in 
flink-sql-connector-hive which shade {{com.google}} to 
{{{}org.apache.flink.hive.shaded.com.google{}}}.
But {{RelOptRule}} will refer to 
{{{}com.google.common.collect.ImmutableList{}}}, afte complie, it will then 
become 
to refer to Field 
{{{}org/apache/calcite/plan/RelOptRuleOperandChildren.operands:Lorg/apache/flink/hive/shaded/com/google/common/collect/ImmutableList{}}}.

But RelOptRuleOperandChildren is compiled in flink-table-planner which shade 
{{com.google}} to {{{}org.apache.flink.calcite.shaded.com.google{}}}, so it 
will only contain a field 
{{{}org/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList{}}}.
Then it'll casue {{{}java.lang.NoSuchFieldError: operands{}}}.

It's simliar to the issue FLINK-32286.


was (Author: luoyuxia):
I tried with puting table-planner-loader.jar in /lib and it works. Since it's 
expected in Flink 1.18 as you can see in FLINK-31575, we recommend to not to 
swap table-planner-loader and table-planner, I would like to close it. Feel 
free to open it again when you still meet problems. 


The reason may be a a little of complex, we include 
\{{org/apache/calcite/plan/RelOptRule.class}} in flink-sql-connector-hive 
3.1.3, the reason can be seen in here. 
Then, the planner will load the class \{{RelOptRule}} in 
flink-sql-connector-hive 3.1.3, but the class RelOptRule in complied in 
flink-sql-connector-hive which shade \{{com.google}} to 
\{{org.apache.flink.hive.shaded.com.google}}.
But \{{RelOptRule}} will refer to \{{com.google.common.collect.ImmutableList}}, 
afte complie, it will then become 
to refer to Field 
\{{org/apache/calcite/plan/RelOptRuleOperandChildren.operands:Lorg/apache/flink/hive/shaded/com/google/common/collect/ImmutableList}}.

But RelOptRuleOperandChildren is compiled in flink-table-planner which shade 
\{{com.google}} to \{{org.apache.flink.calcite.shaded.com.google}}, so it will 
only contain a field 
\{{org/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList}}.
Then it'll casue \{{java.lang.NoSuchFieldError: operands}}.

It's simliar to the issue FLINK-32286.

> An error occurred when executing sql, java.lang.NoSuchFieldError: operands
> --
>
> Key: FLINK-33168
> URL: https://issues.apache.org/jira/browse/FLINK-33168
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: macdoor615
>Assignee: Zheng yunhong
>Priority: Major
>
> Environment:
>  
> {code:java}
> Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 
> 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux
> openjdk version "1.8.0_382"
> OpenJDK Runtime Environment (build 1.8.0_382-b05)
> OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
> flink-1.18.0-RC1 , 
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc1
> {code}
>  
> I execute the following sql in sql-client.sh.
>  
> {code:java}
> insert into svc1_paimon_prod.cq.b_customer_ecus
> select
>   rcus.id id,
>   if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id,    
>   if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) 
> newCus_id,
>   companyID,
>   customerProvinceNumber,
>   mobilePhone,
>   oprCode,
>   customerNum,
>   staffName,
>   location,
>   staffNumber,
>   extendInfo,
>   customerName,
>   case when companyID='000' then '名称1'
>        when companyID='002' then '名称2'
>        else '新名称'
>        end prov,
>   row (
>     accessToken,
>     busType,
>     cutOffDay,
>     domain,
>     envFlag,
>     routeType,
>     routeValue,
>     sessionID,
>     sign,
>     signMethod,
>     org_timeStamp,
>     transIDO,
>     userPartyID,
>     version
>   ) raw_message,
>   named_struct(
>     'id', cus.id,
>     'name', cus.name,
>     'code', cus.code,
>     'customerlevel', cus.customerlevel,
>     'prov', 

[jira] [Resolved] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands

2023-10-10 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-33168.
--
Resolution: Invalid

I tried with puting table-planner-loader.jar in /lib and it works. Since it's 
expected in Flink 1.18 as you can see in FLINK-31575, we recommend to not to 
swap table-planner-loader and table-planner, I would like to close it. Feel 
free to open it again when you still meet problems. 


The reason may be a a little of complex, we include 
\{{org/apache/calcite/plan/RelOptRule.class}} in flink-sql-connector-hive 
3.1.3, the reason can be seen in here. 
Then, the planner will load the class \{{RelOptRule}} in 
flink-sql-connector-hive 3.1.3, but the class RelOptRule in complied in 
flink-sql-connector-hive which shade \{{com.google}} to 
\{{org.apache.flink.hive.shaded.com.google}}.
But \{{RelOptRule}} will refer to \{{com.google.common.collect.ImmutableList}}, 
afte complie, it will then become 
to refer to Field 
\{{org/apache/calcite/plan/RelOptRuleOperandChildren.operands:Lorg/apache/flink/hive/shaded/com/google/common/collect/ImmutableList}}.

But RelOptRuleOperandChildren is compiled in flink-table-planner which shade 
\{{com.google}} to \{{org.apache.flink.calcite.shaded.com.google}}, so it will 
only contain a field 
\{{org/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList}}.
Then it'll casue \{{java.lang.NoSuchFieldError: operands}}.

It's simliar to the issue FLINK-32286.

> An error occurred when executing sql, java.lang.NoSuchFieldError: operands
> --
>
> Key: FLINK-33168
> URL: https://issues.apache.org/jira/browse/FLINK-33168
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: macdoor615
>Assignee: Zheng yunhong
>Priority: Major
>
> Environment:
>  
> {code:java}
> Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 
> 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux
> openjdk version "1.8.0_382"
> OpenJDK Runtime Environment (build 1.8.0_382-b05)
> OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
> flink-1.18.0-RC1 , 
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc1
> {code}
>  
> I execute the following sql in sql-client.sh.
>  
> {code:java}
> insert into svc1_paimon_prod.cq.b_customer_ecus
> select
>   rcus.id id,
>   if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id,    
>   if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) 
> newCus_id,
>   companyID,
>   customerProvinceNumber,
>   mobilePhone,
>   oprCode,
>   customerNum,
>   staffName,
>   location,
>   staffNumber,
>   extendInfo,
>   customerName,
>   case when companyID='000' then '名称1'
>        when companyID='002' then '名称2'
>        else '新名称'
>        end prov,
>   row (
>     accessToken,
>     busType,
>     cutOffDay,
>     domain,
>     envFlag,
>     routeType,
>     routeValue,
>     sessionID,
>     sign,
>     signMethod,
>     org_timeStamp,
>     transIDO,
>     userPartyID,
>     version
>   ) raw_message,
>   named_struct(
>     'id', cus.id,
>     'name', cus.name,
>     'code', cus.code,
>     'customerlevel', cus.customerlevel,
>     'prov', cus.prov,
>     'container', cus.container,
>     'crtime', cus.crtime,
>     'updtime', cus.updtime
>   ) existing_cus,
>   cus_rownum,
>   to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp,
>   raw_rowtime,
>   localtimestamp as raw_rowtime1,
>   dt
> from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq
>   /*+ OPTIONS('consumer-id' = '创建新客户id') */
>   rcus
> left join svc1_mysql_test.gem_svc1_vpn.bv_customer
> FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code
> {code}
> There are the following jar files in the flink/lib directory.
> {code:java}
> commons-cli-1.5.0.jar
> flink-cep-1.18.0.jar
> flink-connector-files-1.18.0.jar
> flink-connector-jdbc-3.1.1-1.17.jar
> flink-csv-1.18.0.jar
> flink-dist-1.18.0.jar
> flink-json-1.18.0.jar
> flink-orc-1.18.0.jar
> flink-parquet-1.18.0.jar
> flink-scala_2.12-1.18.0.jar
> flink-sql-avro-1.18.0.jar
> flink-sql-avro-confluent-registry-1.18.0.jar
> flink-sql-connector-elasticsearch7-3.0.0-1.16.jar
> flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar
> flink-sql-connector-kafka-3.0.0-1.17.jar
> flink-sql-orc-1.18.0.jar
> flink-sql-parquet-1.18.0.jar
> flink-table-api-java-uber-1.18.0.jar
> flink-table-api-scala_2.12-1.18.0.jar
> flink-table-api-scala-bridge_2.12-1.18.0.jar
> flink-table-planner_2.12-1.18.0.jar
> flink-table-runtime-1.18.0.jar
> jline-reader-3.23.0.jar
> jline-terminal-3.23.0.jar
> kafka-clients-3.5.1.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> mysql-connector-j-8.1.0.jar
> 

Re: [PR] [FLINK-24024][table-planner] support session window tvf in plan [flink]

2023-10-10 Thread via GitHub


flinkbot commented on PR #23505:
URL: https://github.com/apache/flink/pull/23505#issuecomment-1756729241

   
   ## CI report:
   
   * 8302b0995123b2a536ae434f8e958dfa3d8c2302 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-24024][table-planner] support session window tvf in plan [flink]

2023-10-10 Thread via GitHub


xuyangzhong opened a new pull request, #23505:
URL: https://github.com/apache/flink/pull/23505

   ## What is the purpose of the change
   
   This pull request aims to introduce the support of the session window tvf in 
the plan. After this pr, users can use the syntax with session window tvf with 
SQL.
   
   Note: this pr is not related to table-runtime.
   
   The session window tvf only has the basic features  like those described in 
https://issues.apache.org/jira/browse/FLINK-23544 .
   
   ## Brief change log
   
 - adapt the requirement about ptf in calcite: full the table 
characteristic of the session window tvf.
 - adapt some rules about introducing session window tvf.
 - add ut and it cases for it.
   
   
   ## Verifying this change
   
   Some ut and it cases are added to verify this pr.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-10-10 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei closed FLINK-32072.
--

> Create and wire FileMergingSnapshotManager with TaskManagerServices
> ---
>
> Key: FLINK-32072
> URL: https://issues.apache.org/jira/browse/FLINK-32072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-10-10 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-32072.

Fix Version/s: 1.19.0
   (was: 1.18.0)
   Resolution: Fixed

> Create and wire FileMergingSnapshotManager with TaskManagerServices
> ---
>
> Key: FLINK-32072
> URL: https://issues.apache.org/jira/browse/FLINK-32072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32072) Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-10-10 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773884#comment-17773884
 ] 

Yanfei Lei commented on FLINK-32072:


Merged into master via 1112582fd136df47c6d356d6f6ad3946ad1e56d5

> Create and wire FileMergingSnapshotManager with TaskManagerServices
> ---
>
> Key: FLINK-32072
> URL: https://issues.apache.org/jira/browse/FLINK-32072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32443][docs-zh] Translate "State Processor API" page into Chinese [flink]

2023-10-10 Thread via GitHub


fredia commented on PR #23496:
URL: https://github.com/apache/flink/pull/23496#issuecomment-1756665330

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-10 Thread via GitHub


FangYongs commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1353844833


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -65,9 +69,14 @@ public class ResourceManager implements Closeable {
 private static final String FILE_SCHEME = "file";
 
 private final Path localResourceDir;
+/** Resource infos for functions. */
+private final Map functionResourceInfos;
+
 protected final Map resourceInfos;
 protected final MutableURLClassLoader userClassLoader;
 
+private final List createdClassLoaderList;

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33228) Fix the total current resource calculation when fulfilling requirements

2023-10-10 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo closed FLINK-33228.
--
Resolution: Fixed

> Fix the total current resource calculation when fulfilling requirements
> ---
>
> Key: FLINK-33228
> URL: https://issues.apache.org/jira/browse/FLINK-33228
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-10-10-16-09-23-635.png
>
>
> Currently, the `totalCurrentResources` calculation in 
> `DefaultResourceAllocationStrategy#tryFulfillRequirements` is wrong.
> `ResourceProfile.merge` will not change the original `ResourceProfile`.
> !image-2023-10-10-16-09-23-635.png|width=564,height=286!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-10 Thread via GitHub


FangYongs commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1353833344


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##
@@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri 
resourceUri) throws IOException {
 Collections.singletonList(resourceUri),
 ResourceType.FILE,
 false,
-url -> {});
+url -> {},
+false);
 registerResources(stagingResources, false);
 return resourceInfos.get(new 
ArrayList<>(stagingResources.keySet()).get(0)).getPath();
 }
 
+/**
+ * Register a resource for function and add it to the function resource 
infos. If the file is
+ * remote, it will be copied to a local file.
+ *
+ * @param resourceUris the resource uri for function.
+ */
+public void registerFunctionResources(Set resourceUris) 
throws IOException {
+prepareStagingResources(
+resourceUris,
+ResourceType.JAR,
+true,
+url -> {
+try {
+JarUtils.checkJarFile(url);
+} catch (IOException e) {
+throw new ValidationException(
+String.format("Failed to register jar resource 
[%s]", url), e);
+}
+},
+true);
+}
+
+/**
+ * Unregister the resource uri in function resources, when the reference 
count of the resource
+ * is 0, the resource will be removed from the function resources.
+ *
+ * @param resourceUris the uris to unregister in function resources.
+ */
+public void unregisterFunctionResources(List resourceUris) {
+if (!resourceUris.isEmpty()) {
+resourceUris.forEach(
+uri -> {
+ResourceCounter counter = 
functionResourceInfos.get(uri);
+if (counter != null && counter.decreaseCounter()) {
+functionResourceInfos.remove(uri);
+}
+});
+}
+}
+
 public URLClassLoader getUserClassLoader() {
 return userClassLoader;
 }
 
+public URLClassLoader createUserClassLoader(List 
resourceUris) {
+if (resourceUris.isEmpty()) {
+return userClassLoader;
+}
+MutableURLClassLoader classLoader = userClassLoader.copy();
+for (ResourceUri resourceUri : new HashSet<>(resourceUris)) {
+
classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url);
+}
+createdClassLoaderList.add(classLoader);
+
+return classLoader;
+}
+
+public void closeUserClassLoader(URLClassLoader classLoader) {
+if (createdClassLoaderList.remove(classLoader)) {

Review Comment:
   Remove created class loader list in resourcemanager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33228) Fix the total current resource calculation when fulfilling requirements

2023-10-10 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773872#comment-17773872
 ] 

Yangze Guo commented on FLINK-33228:


master: 5269631af525a01d944cfa9994a116fb27b80b1b

> Fix the total current resource calculation when fulfilling requirements
> ---
>
> Key: FLINK-33228
> URL: https://issues.apache.org/jira/browse/FLINK-33228
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-10-10-16-09-23-635.png
>
>
> Currently, the `totalCurrentResources` calculation in 
> `DefaultResourceAllocationStrategy#tryFulfillRequirements` is wrong.
> `ResourceProfile.merge` will not change the original `ResourceProfile`.
> !image-2023-10-10-16-09-23-635.png|width=564,height=286!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32512] Don't register resource to user resource manager when creating temporary function [flink]

2023-10-10 Thread via GitHub


FangYongs commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1353833112


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##
@@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, 
boolean ignoreIfNotExist
 "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
 name));
 }
+unregisterFunctionJarResources(function);
 
 return function != null;
 }
 
+private void unregisterFunctionJarResources(CatalogFunction function) {
+if (function != null && function.getFunctionLanguage() == 
FunctionLanguage.JAVA) {

Review Comment:
   Python udf is another story, we fix java udf first and consider python udf 
in another PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33228][flink-runtime] Fix the total current resource calculation when fulfilling requirement [flink]

2023-10-10 Thread via GitHub


KarmaGYZ merged PR #23502:
URL: https://github.com/apache/flink/pull/23502


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands

2023-10-10 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773871#comment-17773871
 ] 

luoyuxia commented on FLINK-33168:
--

[~macdoor615] Hi, could you please try using flink-table-planer-loader.jar 
replace flink-table-planer.jar?

> An error occurred when executing sql, java.lang.NoSuchFieldError: operands
> --
>
> Key: FLINK-33168
> URL: https://issues.apache.org/jira/browse/FLINK-33168
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: macdoor615
>Assignee: Zheng yunhong
>Priority: Major
>
> Environment:
>  
> {code:java}
> Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 
> 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux
> openjdk version "1.8.0_382"
> OpenJDK Runtime Environment (build 1.8.0_382-b05)
> OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
> flink-1.18.0-RC1 , 
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc1
> {code}
>  
> I execute the following sql in sql-client.sh.
>  
> {code:java}
> insert into svc1_paimon_prod.cq.b_customer_ecus
> select
>   rcus.id id,
>   if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id,    
>   if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) 
> newCus_id,
>   companyID,
>   customerProvinceNumber,
>   mobilePhone,
>   oprCode,
>   customerNum,
>   staffName,
>   location,
>   staffNumber,
>   extendInfo,
>   customerName,
>   case when companyID='000' then '名称1'
>        when companyID='002' then '名称2'
>        else '新名称'
>        end prov,
>   row (
>     accessToken,
>     busType,
>     cutOffDay,
>     domain,
>     envFlag,
>     routeType,
>     routeValue,
>     sessionID,
>     sign,
>     signMethod,
>     org_timeStamp,
>     transIDO,
>     userPartyID,
>     version
>   ) raw_message,
>   named_struct(
>     'id', cus.id,
>     'name', cus.name,
>     'code', cus.code,
>     'customerlevel', cus.customerlevel,
>     'prov', cus.prov,
>     'container', cus.container,
>     'crtime', cus.crtime,
>     'updtime', cus.updtime
>   ) existing_cus,
>   cus_rownum,
>   to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp,
>   raw_rowtime,
>   localtimestamp as raw_rowtime1,
>   dt
> from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq
>   /*+ OPTIONS('consumer-id' = '创建新客户id') */
>   rcus
> left join svc1_mysql_test.gem_svc1_vpn.bv_customer
> FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code
> {code}
> There are the following jar files in the flink/lib directory.
> {code:java}
> commons-cli-1.5.0.jar
> flink-cep-1.18.0.jar
> flink-connector-files-1.18.0.jar
> flink-connector-jdbc-3.1.1-1.17.jar
> flink-csv-1.18.0.jar
> flink-dist-1.18.0.jar
> flink-json-1.18.0.jar
> flink-orc-1.18.0.jar
> flink-parquet-1.18.0.jar
> flink-scala_2.12-1.18.0.jar
> flink-sql-avro-1.18.0.jar
> flink-sql-avro-confluent-registry-1.18.0.jar
> flink-sql-connector-elasticsearch7-3.0.0-1.16.jar
> flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar
> flink-sql-connector-kafka-3.0.0-1.17.jar
> flink-sql-orc-1.18.0.jar
> flink-sql-parquet-1.18.0.jar
> flink-table-api-java-uber-1.18.0.jar
> flink-table-api-scala_2.12-1.18.0.jar
> flink-table-api-scala-bridge_2.12-1.18.0.jar
> flink-table-planner_2.12-1.18.0.jar
> flink-table-runtime-1.18.0.jar
> jline-reader-3.23.0.jar
> jline-terminal-3.23.0.jar
> kafka-clients-3.5.1.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> mysql-connector-j-8.1.0.jar
> paimon-flink-1.18-0.6-20230929.002044-11.jar{code}
> Works correctly in version 1.17.1, but produces the following error in 
> 1.18.0-RC1
>  
> {code:java}
> 2023-09-29 14:04:11,438 ERROR 
> org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed 
> to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da.
> java.lang.NoSuchFieldError: operands
> at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) 
> ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> 

[jira] [Assigned] (FLINK-33168) An error occurred when executing sql, java.lang.NoSuchFieldError: operands

2023-10-10 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-33168:


Assignee: Zheng yunhong

> An error occurred when executing sql, java.lang.NoSuchFieldError: operands
> --
>
> Key: FLINK-33168
> URL: https://issues.apache.org/jira/browse/FLINK-33168
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.0
>Reporter: macdoor615
>Assignee: Zheng yunhong
>Priority: Major
>
> Environment:
>  
> {code:java}
> Linux hb3-prod-hadoop-006 4.18.0-477.27.1.el8_8.x86_64 #1 SMP Thu Sep 21 
> 06:49:25 EDT 2023 x86_64 x86_64 x86_64 GNU/Linux
> openjdk version "1.8.0_382"
> OpenJDK Runtime Environment (build 1.8.0_382-b05)
> OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
> flink-1.18.0-RC1 , 
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc1
> {code}
>  
> I execute the following sql in sql-client.sh.
>  
> {code:java}
> insert into svc1_paimon_prod.cq.b_customer_ecus
> select
>   rcus.id id,
>   if(cus.id is not null, cus.id, try_cast(NULL as string)) cus_id,    
>   if(cus.id is null and cus_rownum = 1, rcus.id, try_cast(NULL as string)) 
> newCus_id,
>   companyID,
>   customerProvinceNumber,
>   mobilePhone,
>   oprCode,
>   customerNum,
>   staffName,
>   location,
>   staffNumber,
>   extendInfo,
>   customerName,
>   case when companyID='000' then '名称1'
>        when companyID='002' then '名称2'
>        else '新名称'
>        end prov,
>   row (
>     accessToken,
>     busType,
>     cutOffDay,
>     domain,
>     envFlag,
>     routeType,
>     routeValue,
>     sessionID,
>     sign,
>     signMethod,
>     org_timeStamp,
>     transIDO,
>     userPartyID,
>     version
>   ) raw_message,
>   named_struct(
>     'id', cus.id,
>     'name', cus.name,
>     'code', cus.code,
>     'customerlevel', cus.customerlevel,
>     'prov', cus.prov,
>     'container', cus.container,
>     'crtime', cus.crtime,
>     'updtime', cus.updtime
>   ) existing_cus,
>   cus_rownum,
>   to_timestamp(org_timeStamp, 'MMddHHmmss') as org_timeStamp,
>   raw_rowtime,
>   localtimestamp as raw_rowtime1,
>   dt
> from svc1_paimon_prod.raw_data.abscustinfoserv_content_append_cq
>   /*+ OPTIONS('consumer-id' = '创建新客户id') */
>   rcus
> left join svc1_mysql_test.gem_svc1_vpn.bv_customer
> FOR SYSTEM_TIME AS OF rcus.proctime AS cus on rcus.customerNum=cus.code
> {code}
> There are the following jar files in the flink/lib directory.
> {code:java}
> commons-cli-1.5.0.jar
> flink-cep-1.18.0.jar
> flink-connector-files-1.18.0.jar
> flink-connector-jdbc-3.1.1-1.17.jar
> flink-csv-1.18.0.jar
> flink-dist-1.18.0.jar
> flink-json-1.18.0.jar
> flink-orc-1.18.0.jar
> flink-parquet-1.18.0.jar
> flink-scala_2.12-1.18.0.jar
> flink-sql-avro-1.18.0.jar
> flink-sql-avro-confluent-registry-1.18.0.jar
> flink-sql-connector-elasticsearch7-3.0.0-1.16.jar
> flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar
> flink-sql-connector-kafka-3.0.0-1.17.jar
> flink-sql-orc-1.18.0.jar
> flink-sql-parquet-1.18.0.jar
> flink-table-api-java-uber-1.18.0.jar
> flink-table-api-scala_2.12-1.18.0.jar
> flink-table-api-scala-bridge_2.12-1.18.0.jar
> flink-table-planner_2.12-1.18.0.jar
> flink-table-runtime-1.18.0.jar
> jline-reader-3.23.0.jar
> jline-terminal-3.23.0.jar
> kafka-clients-3.5.1.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> mysql-connector-j-8.1.0.jar
> paimon-flink-1.18-0.6-20230929.002044-11.jar{code}
> Works correctly in version 1.17.1, but produces the following error in 
> 1.18.0-RC1
>  
> {code:java}
> 2023-09-29 14:04:11,438 ERROR 
> org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed 
> to execute the operation fe1b0a58-b822-49c0-b1ae-ce73d16f92da.
> java.lang.NoSuchFieldError: operands
> at org.apache.calcite.plan.RelOptRule.operand(RelOptRule.java:129) 
> ~[flink-sql-connector-hive-3.1.3_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule.(SimplifyFilterConditionRule.scala:36)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala:94)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule$.(SimplifyFilterConditionRule.scala)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala:35)
>  ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
> at 
> org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets$.(FlinkStreamRuleSets.scala)
>  

[jira] [Resolved] (FLINK-25593) A redundant scan could be skipped if it is an input of join and the other input is empty after partition prune

2023-10-10 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-25593.
--
Resolution: Fixed

master: 1b70d25b81daa7bd4a23c048cb3a79bc43a21d5f

> A redundant scan could be skipped if it is an input of join and the other 
> input is empty after partition prune
> --
>
> Key: FLINK-25593
> URL: https://issues.apache.org/jira/browse/FLINK-25593
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jing Zhang
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
>
> A redundant scan could be skipped if it is an input of join and the other 
> input is empty after partition prune.
> For example:
> ltable has two partitions: pt=0 ad pt=1, rtable has one partition pt1=0.
> The schema of ltable is (lkey string, value int).
> The schema of rtable is (rkey string, value int).
> {code:sql}
> SELECT * FROM ltable, rtable WHERE pt=2 and pt1=0 and `lkey`=rkey
> {code}
> The plan is as following.
> {code:java}
> Calc(select=[lkey, value, CAST(2 AS INTEGER) AS pt, rkey, value1, CAST(0 AS 
> INTEGER) AS pt1])
> +- HashJoin(joinType=[InnerJoin], where=[=(lkey, rkey)], select=[lkey, value, 
> rkey, value1], build=[right])
>:- Exchange(distribution=[hash[lkey]])
>:  +- TableSourceScan(table=[[hive, source_db, ltable, partitions=[], 
> project=[lkey, value]]], fields=[lkey, value])
>+- Exchange(distribution=[hash[rkey]])
>   +- TableSourceScan(table=[[hive, source_db, rtable, 
> partitions=[{pt1=0}], project=[rkey, value1]]], fields=[rkey, value1])
> {code}
> There is no need to scan right side because the left input of join has 0 
> partitions after partition prune.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25593][table-planner] Skip redundant scan while partition table push down contains none-existent partition [flink]

2023-10-10 Thread via GitHub


luoyuxia merged PR #23423:
URL: https://github.com/apache/flink/pull/23423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-33231:
---

Assignee: Tzu-Li (Gordon) Tai

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 

Re: [PR] [FLINK-33191][Connector/Kafka] Remove dependency on Flink Shaded [flink-connector-kafka]

2023-10-10 Thread via GitHub


tzulitai commented on code in PR #57:
URL: 
https://github.com/apache/flink-connector-kafka/pull/57#discussion_r1353655402


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java:
##
@@ -82,16 +81,24 @@ public void testKafkaDeserializationSchemaWrapper() throws 
Exception {
 @Test
 public void testKafkaValueDeserializationSchemaWrapper() throws Exception {
 final ConsumerRecord consumerRecord = 
getConsumerRecord();
-KafkaRecordDeserializationSchema schema =
-KafkaRecordDeserializationSchema.valueOnly(
-new JsonDeserializationSchema<>(ObjectNode.class));
+KafkaRecordDeserializationSchema<
+
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node

Review Comment:
   I'm a bit lost here. Why do these test code still need to use the shaded 
version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33191:

Affects Version/s: kafka-3.0.0
   (was: 1.18.0)

> Kafka Connector should directly depend on 3rd-party libs instead of 
> flink-shaded repo
> -
>
> Key: FLINK-33191
> URL: https://issues.apache.org/jira/browse/FLINK-33191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Jing Ge
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33191:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Kafka Connector should directly depend on 3rd-party libs instead of 
> flink-shaded repo
> -
>
> Key: FLINK-33191
> URL: https://issues.apache.org/jira/browse/FLINK-33191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.0.0
>Reporter: Jing Ge
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33020) OpensearchSinkTest.testAtLeastOnceSink timed out

2023-10-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-33020.
-
Fix Version/s: opensearch-1.1.0
   Resolution: Fixed

> OpensearchSinkTest.testAtLeastOnceSink timed out
> 
>
> Key: FLINK-33020
> URL: https://issues.apache.org/jira/browse/FLINK-33020
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.2
>Reporter: Martijn Visser
>Assignee: Andriy Redko
>Priority: Blocker
>  Labels: pull-request-available, stale-blocker
> Fix For: opensearch-1.1.0
>
>
> https://github.com/apache/flink-connector-opensearch/actions/runs/6061205003/job/16446139552#step:13:1029
> {code:java}
> Error:  Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 9.837 
> s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest
> Error:  
> org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.testAtLeastOnceSink
>   Time elapsed: 5.022 s  <<< ERROR!
> java.util.concurrent.TimeoutException: testAtLeastOnceSink() timed out after 
> 5 seconds
>   at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70)
>   at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>   at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> 

[jira] [Commented] (FLINK-33020) OpensearchSinkTest.testAtLeastOnceSink timed out

2023-10-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773823#comment-17773823
 ] 

Sergey Nuyanzin commented on FLINK-33020:
-

Time out was increased at 
[87b23c46f6f3a26bdd645d623a34dee3d19dac9d|https://github.com/apache/flink-connector-opensearch/commit/87b23c46f6f3a26bdd645d623a34dee3d19dac9d]

> OpensearchSinkTest.testAtLeastOnceSink timed out
> 
>
> Key: FLINK-33020
> URL: https://issues.apache.org/jira/browse/FLINK-33020
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.2
>Reporter: Martijn Visser
>Assignee: Andriy Redko
>Priority: Blocker
>  Labels: pull-request-available, stale-blocker
>
> https://github.com/apache/flink-connector-opensearch/actions/runs/6061205003/job/16446139552#step:13:1029
> {code:java}
> Error:  Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 9.837 
> s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest
> Error:  
> org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.testAtLeastOnceSink
>   Time elapsed: 5.022 s  <<< ERROR!
> java.util.concurrent.TimeoutException: testAtLeastOnceSink() timed out after 
> 5 seconds
>   at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70)
>   at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>   at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 

[PR] [hotfix] Bump flink version to 1.17.1 [flink-connector-opensearch]

2023-10-10 Thread via GitHub


snuyanzin opened a new pull request, #33:
URL: https://github.com/apache/flink-connector-opensearch/pull/33

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33020) OpensearchSinkTest.testAtLeastOnceSink timed out

2023-10-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-33020:
---

Assignee: Andriy Redko

> OpensearchSinkTest.testAtLeastOnceSink timed out
> 
>
> Key: FLINK-33020
> URL: https://issues.apache.org/jira/browse/FLINK-33020
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.2
>Reporter: Martijn Visser
>Assignee: Andriy Redko
>Priority: Blocker
>  Labels: pull-request-available, stale-blocker
>
> https://github.com/apache/flink-connector-opensearch/actions/runs/6061205003/job/16446139552#step:13:1029
> {code:java}
> Error:  Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 9.837 
> s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest
> Error:  
> org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.testAtLeastOnceSink
>   Time elapsed: 5.022 s  <<< ERROR!
> java.util.concurrent.TimeoutException: testAtLeastOnceSink() timed out after 
> 5 seconds
>   at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.createTimeoutException(TimeoutInvocation.java:70)
>   at 
> org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:59)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>   at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>   at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>   at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> 

Re: [PR] [FLINK-33020] OpensearchSinkTest.testAtLeastOnceSink timed out [flink-connector-opensearch]

2023-10-10 Thread via GitHub


snuyanzin merged PR #32:
URL: https://github.com/apache/flink-connector-opensearch/pull/32


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33200) ItemAt Expression validation fail in Table API due to type mismatch

2023-10-10 Thread Andriy Onyshchuk (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773794#comment-17773794
 ] 

Andriy Onyshchuk commented on FLINK-33200:
--

Seems like I have caught several more issues related to `AT` expression and 
type resolution.
 # accessing array of primitives doesn't work if at data level arrays 
represented as ArrayData. What I got is: `Unsupported conversion from data type 
'ARRAY' (conversion class: org.apache.flink.table.data.ArrayData) to type 
information. Only data types that originated from type information fully 
support a reverse conversion.`
 # accessing `Map` throws too. `map.at(0L)` gets failed with 
Incompatible types for sink column
{{Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Column types of query result and sink for '*anonymous_collect$3*' do not 
match.}}
{{Cause: Incompatible types for sink column 'longData_at_0' at position 0.}}
{{Query schema: [longData_at_0: STRING]}}
{{Sink schema: [longData_at_0: RAW('org.apache.flink.table.data.StringData', 
?)]}}

All issues are reflected in `IssueDemo.java` (see attachments).

> ItemAt Expression validation fail in Table API due to type mismatch
> ---
>
> Key: FLINK-33200
> URL: https://issues.apache.org/jira/browse/FLINK-33200
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Zhenqiu Huang
>Priority: Minor
> Attachments: IssueDemo.java
>
>
> The table schema is defined as below:
> public static final DataType DATA_TYPE = DataTypes.ROW(
> DataTypes.FIELD("id", DataTypes.STRING()),
> DataTypes.FIELD("events", 
> DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING(
> );
> public static final Schema SCHEMA = 
> Schema.newBuilder().fromRowDataType(DATA_TYPE).build();
> inTable.select(Expressions.$("events").at(1).at("eventType").as("firstEventType")
> The validation fail as "eventType" is inferred as 
> BasicTypeInfo.STRING_TYPE_INFO, the table key internally is a 
> StringDataTypeInfo. The validation fail at 
> case mti: MapTypeInfo[_, _] =>
> if (key.resultType == mti.getKeyTypeInfo) {
>   ValidationSuccess
> } else {
>   ValidationFailure(
> s"Map entry access needs a valid key of type " +
>   s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33200) ItemAt Expression validation fail in Table API due to type mismatch

2023-10-10 Thread Andriy Onyshchuk (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andriy Onyshchuk updated FLINK-33200:
-
Attachment: IssueDemo.java

> ItemAt Expression validation fail in Table API due to type mismatch
> ---
>
> Key: FLINK-33200
> URL: https://issues.apache.org/jira/browse/FLINK-33200
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Zhenqiu Huang
>Priority: Minor
> Attachments: IssueDemo.java
>
>
> The table schema is defined as below:
> public static final DataType DATA_TYPE = DataTypes.ROW(
> DataTypes.FIELD("id", DataTypes.STRING()),
> DataTypes.FIELD("events", 
> DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING(
> );
> public static final Schema SCHEMA = 
> Schema.newBuilder().fromRowDataType(DATA_TYPE).build();
> inTable.select(Expressions.$("events").at(1).at("eventType").as("firstEventType")
> The validation fail as "eventType" is inferred as 
> BasicTypeInfo.STRING_TYPE_INFO, the table key internally is a 
> StringDataTypeInfo. The validation fail at 
> case mti: MapTypeInfo[_, _] =>
> if (key.resultType == mti.getKeyTypeInfo) {
>   ValidationSuccess
> } else {
>   ValidationFailure(
> s"Map entry access needs a valid key of type " +
>   s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33231:
---
Labels: pull-request-available  (was: )

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 

[PR] [FLINK-33231] [source] Properly evict offsetsToCommit cache on checkpoint complete if no offsets exist [flink-connector-kafka]

2023-10-10 Thread via GitHub


tzulitai opened a new pull request, #58:
URL: https://github.com/apache/flink-connector-kafka/pull/58

   Prior to this fix, if the offsets to commit for a given checkpoint is empty, 
which can be the case if no starting offsets were retrieved from Kafka yet, 
then on checkpoint completion the cache is not properly evicted up to the given 
checkpoint.
   
   This change fixes this such that in notifyOnCheckpointComplete, we shortcut 
the method execution to not need to try to commit the offsets since its empty 
anyways, and always remember to evict the cache up to the completed checkpoint.
   
   ## Testing
   
   I've modified the existing `KafkaSourceReaderTest#testCommitEmptyOffsets()` 
test to fail if the cache eviction fix was not applied. With this PR, that test 
now passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773790#comment-17773790
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33231:
-

[~lauri.suurvali] I think that would work, but the issue is that in the 
callback, on success we log that a commit was successful, and also source 
reader metrics is bumped. Which can be confusing if no offsets were actually 
committed. Moreoever, with that approach we would be relying on internal 
details of the Kafka client that is hard to cover with tests (i.e. things might 
silently change such that a remote request is issued even if provided offsets 
are empty, which is not ideal).

So, I think we can be a bit cleaner by short-cutting the 
{{notifyCheckpointComplete}} method such that is the offsets for a checkpoint 
is empty, we don't even attempt to use the fetcher manager to try to commit 
offsets.

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: 

[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773784#comment-17773784
 ] 

Lauri Suurväli commented on FLINK-33231:


[~tzulitai] thank you for the comment!
Would removing the code that you linked, which returns in case of an empty 
offsetsToCommit, be an option to solve this issue? The remaining code would end 
up in 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#sendOffsetCommitRequest
 which would return a successful response locally in case of an empty offsets 
map. 
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 at master · a0x8o/kafka 
(github.com)|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1267-L1269]

Since the request is handled locally then perhaps this is a good way to ensure 
that the callback function isn't discarded. Would this sort of an approach 
bring any additional overhead that we would like to avoid or perhaps I am 
missing something?

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> 

Re: [PR] [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data [flink-connector-kafka]

2023-10-10 Thread via GitHub


tzulitai commented on PR #52:
URL: 
https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1755995458

   @Tan-JiaLiang ok - lets try to reach consensus here.
   
   It does seem like that my approach can lead to other problems, e.g. 
bombarding Kafka brokers from multiple end-offset lookups across multiple TMs. 
This might turn out worse than doing just one more extra query on the JM.
   
   And since JMs right already are assumed to have access to Kafka brokers 
already anyways, we should be breaking anything for users.
   
   So overall - lets proceed with this PR's approach. Sorry for the back and 
forth @Tan-JiaLiang, just wanted to make sure that we're fixing this properly.
   
   Regarding the restore issue you mentioned: yes, please address that, and 
then I think we can merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data [flink-connector-kafka]

2023-10-10 Thread via GitHub


tzulitai commented on PR #52:
URL: 
https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1755980007

   @Tan-JiaLiang you are right, that's why it's taking me a while to open the 
new PR. The problem with my approach is that it can potentially read more 
records when restoring.
   
   Essentially what we need to do, is that at _some_ point before the first 
snapshot is taken, we need to actually look up the offsets for empty partitions 
and replace any kind of marker. Your PR does this very early on the JMs, and 
I'm trying to figure out if there's any way to postpone this as much as 
possible to the TMs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33033][olap][haservice] Add haservice micro benchmark for olap [flink-benchmarks]

2023-10-10 Thread via GitHub


XComp commented on code in PR #78:
URL: https://github.com/apache/flink-benchmarks/pull/78#discussion_r1353047609


##
src/main/java/org/apache/flink/olap/benchmark/HighAvailabilityServiceBenchmark.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.olap.benchmark;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.benchmark.BenchmarkBase;
+import org.apache.flink.benchmark.FlinkEnvironmentContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FileUtils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/** The benchmark for submitting short-lived jobs with and without high 
availability service. */
+@OutputTimeUnit(SECONDS)
+public class HighAvailabilityServiceBenchmark extends BenchmarkBase {
+   public static void main(String[] args) throws RunnerException {
+   Options options =
+   new OptionsBuilder()
+   .verbosity(VerboseMode.NORMAL)
+   .include(".*" + 
HighAvailabilityServiceBenchmark.class.getCanonicalName() + ".*")
+   .build();
+
+   new Runner(options).run();
+   }
+
+   @Benchmark
+   public void submitJobThroughput(HighAvailabilityContext context) throws 
Exception {
+   context.miniCluster.executeJobBlocking(buildNoOpJob());
+   }
+
+   private JobGraph buildNoOpJob() {
+   JobGraph jobGraph = new JobGraph(JobID.generate(), 
UUID.randomUUID().toString());
+   jobGraph.addVertex(createNoOpVertex());
+   return jobGraph;
+   }
+
+   private JobVertex createNoOpVertex() {

Review Comment:
   ```suggestion
private static JobVertex createNoOpVertex() {
   ```
   nit: the create methods could be static



##
pom.xml:
##
@@ -63,6 +63,7 @@ under the License.
0.7.6
0.13.0
3.21.7
+   5.4.0

Review Comment:
   I guess we have to keep the curator version here aligned with the Flink 
dependency. Do we have to set it explicitlly or is there a way to use Flink's 
dependency transitively? :thinking: We might want to add a comment to the 
corresponding line in 
[apache/flink:pom.xml:144](https://github.com/apache/flink/blob/ec6ebe2d22d15883f7236895387a45a533cfefe0/pom.xml#L144)
 to point out that updating the curator version in Flink should result in 
updating the corresponding version in Flink's benchmark tests as well.



##
src/main/java/org/apache/flink/olap/benchmark/HighAvailabilityServiceBenchmark.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by 

[jira] [Commented] (FLINK-32806) EmbeddedJobResultStore keeps the non-dirty job entries forever

2023-10-10 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773758#comment-17773758
 ] 

Samrat Deb commented on FLINK-32806:


[~hk__lrzy] are you working on the patch ? 

> EmbeddedJobResultStore keeps the non-dirty job entries forever
> --
>
> Key: FLINK-32806
> URL: https://issues.apache.org/jira/browse/FLINK-32806
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1, 1.19.0
>Reporter: Matthias Pohl
>Assignee: hk__lrzy
>Priority: Major
>  Labels: stale-assigned, starter
>
> The {{EmbeddedJobResultStore}} keeps the entries of cleaned-up jobs in-memory 
> forever. We might want to add a TTL to have those entries be removed after a 
> certain amount of time to allow maintaining the memory footprint of the 
> {{EmbeddedJobResultStore}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Backport Jetty's fix for java version parsing [flink-connector-hbase]

2023-10-10 Thread via GitHub


snuyanzin commented on PR #25:
URL: 
https://github.com/apache/flink-connector-hbase/pull/25#issuecomment-1755768586

   @MartijnVisser this should fix failing builds 
   could you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773739#comment-17773739
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33231:
-

Making this a blocker for the upcoming Kafka connector releases.

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 

[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33231:

Fix Version/s: kafka-3.0.1
   kafka-3.1.0

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Fix For: kafka-3.0.1, kafka-3.1.0
>
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
> 

[jira] [Updated] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-33231:

Priority: Blocker  (was: Major)

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Blocker
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
> {code:java}
> if (offsetsToCommit.isEmpty()) {
> return;
> } {code}
> We can 

[jira] [Commented] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773738#comment-17773738
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33231:
-

[~lauri.suurvali] great debugging!

I think the fix is basically, in KafkaSourceFetcherManager#commitOffsets, if 
the provided offsetsToCommitMap is empty, the callback (where the logic for 
truncating the map) should be used as well. Currently, it just returns without 
calling the callback at all. Code link: 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78-L80

> Memory leak in KafkaSourceReader if no data in consumed topic
> -
>
> Key: FLINK-33231
> URL: https://issues.apache.org/jira/browse/FLINK-33231
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: Lauri Suurväli
>Priority: Major
> Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map offsetsMap =
> offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
> offsetsMap.put(
> split.getTopicPartition(),
> new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> 

[PR] Backport Jetty's fix for java version parsing [flink-connector-hbase]

2023-10-10 Thread via GitHub


snuyanzin opened a new pull request, #25:
URL: https://github.com/apache/flink-connector-hbase/pull/25

   Merge fix for jetty https://github.com/eclipse/jetty.project/pull/2331
   Since it is in Jetty 9.6 and hbase depends on 9.3 and lower


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33052) codespeed and benchmark server is down

2023-10-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773726#comment-17773726
 ] 

Martijn Visser commented on FLINK-33052:


[~Zakelly] Is there any update on this ticket?

> codespeed and benchmark server is down
> --
>
> Key: FLINK-33052
> URL: https://issues.apache.org/jira/browse/FLINK-33052
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks, Test Infrastructure
>Affects Versions: 1.18.0
>Reporter: Jing Ge
>Assignee: Zakelly Lan
>Priority: Blocker
>
> No update in #flink-dev-benchmarks slack channel since 25th August.
> It was a EC2 running in a legacy aws account. Currently on one knows which 
> account it is. 
>  
> https://apache-flink.slack.com/archives/C0471S0DFJ9/p1693932155128359



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32038) OffsetCommitMode.Kafka_periodic with checkpointing enabled

2023-10-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773719#comment-17773719
 ] 

Martijn Visser commented on FLINK-32038:


[~pritam.agarwala] [~tzulitai] What is the conclusion of this ticket?

> OffsetCommitMode.Kafka_periodic with checkpointing enabled 
> ---
>
> Key: FLINK-32038
> URL: https://issues.apache.org/jira/browse/FLINK-32038
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Runtime / Checkpointing
>Affects Versions: 1.14.6
>Reporter: Pritam Agarwala
>Priority: Major
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka 
> committed offset. Flink is updating the offsets only after checkpointing to 
> make it consistent.
> Default Behaviour as per doc :
> If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set 
> to false, then offset will not be committed at all even if the 
> {{enable.auto.commit}} is set to true.
> So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't 
> it fall back on the {{enable.auto.commit}} to do offset commit regularly 
> since* *in any case flink doesn't use consumer committed offsets for 
> recovery.*
>  
> OffsetCommitModes class :
>   
> {code:java}
> public class OffsetCommitModes {
> /**
>  * Determine the offset commit mode using several configuration values.
>  *
>  * @param enableAutoCommit whether or not auto committing is enabled in 
> the provided Kafka
>  * properties.
>  * @param enableCommitOnCheckpoint whether or not committing on 
> checkpoints is enabled.
>  * @param enableCheckpointing whether or not checkpoint is enabled for 
> the consumer.
>  * @return the offset commit mode to use, based on the configuration 
> values.
>  */
> public static OffsetCommitMode fromConfiguration(
> boolean enableAutoCommit,
> boolean enableCommitOnCheckpoint,
> boolean enableCheckpointing) {
> if (enableCheckpointing) {
> // if checkpointing is enabled, the mode depends only on whether  
>  committing on
> // checkpoints is enabled
> return (enableCommitOnCheckpoint)
> ? OffsetCommitMode.ON_CHECKPOINTS
> : OffsetCommitMode.DISABLED;
> } else {
> // else, the mode depends only on whether auto committing is 
> enabled in the provided
> // Kafka properties
> return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
> }
> }
> }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors

2023-10-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773708#comment-17773708
 ] 

Martijn Visser commented on FLINK-30400:


[~chesnay] How would this work for SQL FAT jars? IIUC they actually need 
connector-base

> Stop bundling connector-base in externalized connectors
> ---
>
> Key: FLINK-30400
> URL: https://issues.apache.org/jira/browse/FLINK-30400
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Chesnay Schepler
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> Check that none of the externalized connectors bundle connector-base; if so 
> remove the bundling and schedule a new minor release.
> Bundling this module is highly problematic w.r.t. binary compatibility, since 
> bundled classes may rely on internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33234) Bump used Guava version in Kafka E2E tests

2023-10-10 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-33234:
--

 Summary: Bump used Guava version in Kafka E2E tests
 Key: FLINK-33234
 URL: https://issues.apache.org/jira/browse/FLINK-33234
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Reporter: Martijn Visser
Assignee: Martijn Visser


To resolve existing Dependabot PRs: 
https://github.com/apache/flink-connector-kafka/security/dependabot?q=package%3Acom.google.guava%3Aguava+manifest%3Aflink-connector-kafka-e2e-tests%2Fflink-end-to-end-tests-common-kafka%2Fpom.xml+has%3Apatch



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-10 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-33182:

Fix Version/s: 1.19.0

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: lincoln lee
>Priority: Major
> Fix For: 1.19.0
>
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-10 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773700#comment-17773700
 ] 

lincoln lee commented on FLINK-33182:
-

[~twalthr] Of course, I'll put it on my worklist for the next release.

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-10 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-33182:
---

Assignee: lincoln lee

> Allow metadata columns in NduAnalyzer with ChangelogNormalize
> -
>
> Key: FLINK-33182
> URL: https://issues.apache.org/jira/browse/FLINK-33182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: lincoln lee
>Priority: Major
>
> Currently, the NduAnalyzer is very strict about metadata columns in updating 
> sources. However, for upsert sources (like Kafka) that contain an incomplete 
> changelog, the planner always adds a ChangelogNormalize node. 
> ChangelogNormalize will make sure that metadata columns can be considered 
> deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33233][hive] Fix NPE when non-native udf used in join condition in hive-parser [flink]

2023-10-10 Thread via GitHub


flinkbot commented on PR #23504:
URL: https://github.com/apache/flink/pull/23504#issuecomment-1755484549

   
   ## CI report:
   
   * 49df8d49f9a7d30f7f3d26dc2f86f95a2c77d98e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33233) Null point exception when non-native udf used in join condition

2023-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33233:
---
Labels: pull-request-available  (was: )

> Null point exception when non-native udf used in join condition
> ---
>
> Key: FLINK-33233
> URL: https://issues.apache.org/jira/browse/FLINK-33233
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: yunfan
>Priority: Major
>  Labels: pull-request-available
>
> Any non-native udf used in hive-parser join condition. 
> It will caused NullPointException.
> It can reproduced by follow code by adding this test to 
> {code:java}
> org.apache.flink.connectors.hive.HiveDialectQueryITCase{code}
>  
> {code:java}
> // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase
> @Test
> public void testUdfInJoinCondition() throws Exception {
> List result = CollectionUtil.iteratorToList(tableEnv.executeSql(
> "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I 
> where bar.I > 1").collect());
> assertThat(result.toString())
> .isEqualTo("[+I[2, 2]]");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33233) Null point exception when non-native udf used in join condition

2023-10-10 Thread yunfan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yunfan updated FLINK-33233:
---
Description: 
Any non-native udf used in hive-parser join condition. 

It will caused NullPointException.

It can reproduced by follow code by adding this test to 
{code:java}
org.apache.flink.connectors.hive.HiveDialectQueryITCase{code}
 
{code:java}
// Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase
@Test
public void testUdfInJoinCondition() throws Exception {
List result = CollectionUtil.iteratorToList(tableEnv.executeSql(
"select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I 
where bar.I > 1").collect());
assertThat(result.toString())
.isEqualTo("[+I[2, 2]]");
} {code}

  was:
Any non-native udf used in hive-parser join condition. 

It will caused NullPointException.


> Null point exception when non-native udf used in join condition
> ---
>
> Key: FLINK-33233
> URL: https://issues.apache.org/jira/browse/FLINK-33233
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: yunfan
>Priority: Major
>
> Any non-native udf used in hive-parser join condition. 
> It will caused NullPointException.
> It can reproduced by follow code by adding this test to 
> {code:java}
> org.apache.flink.connectors.hive.HiveDialectQueryITCase{code}
>  
> {code:java}
> // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase
> @Test
> public void testUdfInJoinCondition() throws Exception {
> List result = CollectionUtil.iteratorToList(tableEnv.executeSql(
> "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I 
> where bar.I > 1").collect());
> assertThat(result.toString())
> .isEqualTo("[+I[2, 2]]");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33233][hive] Fix NPE when non-native udf used in join condition in hive-parser [flink]

2023-10-10 Thread via GitHub


yunfan123 opened a new pull request, #23504:
URL: https://github.com/apache/flink/pull/23504

   ## What is the purpose of the change
   Fix NPE when non-native udf used in join condition in hive-parser
   
   ## Brief change log
   This problem caused by not set  UnparseTranslator when init 
HiveParserJoinTypeCheckCtx.
   
   
   ## Verifying this change
   This change is verified by added test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33233) Null point exception when non-native udf used in join condition

2023-10-10 Thread yunfan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yunfan updated FLINK-33233:
---
Environment: (was: It can reproduced by follow code by adding this test 
to 
{code:java}
org.apache.flink.connectors.hive.HiveDialectQueryITCase{code}
 
{code:java}
// Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase
@Test
public void testUdfInJoinCondition() throws Exception {
List result = CollectionUtil.iteratorToList(tableEnv.executeSql(
"select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I 
where bar.I > 1").collect());
assertThat(result.toString())
.isEqualTo("[+I[2, 2]]");
} {code})

> Null point exception when non-native udf used in join condition
> ---
>
> Key: FLINK-33233
> URL: https://issues.apache.org/jira/browse/FLINK-33233
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: yunfan
>Priority: Major
>
> Any non-native udf used in hive-parser join condition. 
> It will caused NullPointException.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33233) Null point exception when non-native udf used in join condition

2023-10-10 Thread yunfan (Jira)
yunfan created FLINK-33233:
--

 Summary: Null point exception when non-native udf used in join 
condition
 Key: FLINK-33233
 URL: https://issues.apache.org/jira/browse/FLINK-33233
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.17.0
 Environment: It can reproduced by follow code by adding this test to 
{code:java}
org.apache.flink.connectors.hive.HiveDialectQueryITCase{code}
 
{code:java}
// Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase
@Test
public void testUdfInJoinCondition() throws Exception {
List result = CollectionUtil.iteratorToList(tableEnv.executeSql(
"select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I 
where bar.I > 1").collect());
assertThat(result.toString())
.isEqualTo("[+I[2, 2]]");
} {code}
Reporter: yunfan


Any non-native udf used in hive-parser join condition. 

It will caused NullPointException.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33232) Kubernetive Operator Not Able to Take Other Python paramters While Submitting Job Deployment

2023-10-10 Thread Amarjeet Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amarjeet Singh updated FLINK-33232:
---
Description: 
Flink Operator Is not Able to Read the Python Cmd like -pyFiles.
While apply using Kubernetive on a Flink Session Cluster. The PyFiles are 
mounted using EFS, Not able to Read the EFS files and apply it

> Kubernetive Operator Not Able to Take Other Python paramters While Submitting 
> Job Deployment
> 
>
> Key: FLINK-33232
> URL: https://issues.apache.org/jira/browse/FLINK-33232
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0
>Reporter: Amarjeet Singh
>Priority: Major
> Fix For: 1.17.1
>
>
> Flink Operator Is not Able to Read the Python Cmd like -pyFiles.
> While apply using Kubernetive on a Flink Session Cluster. The PyFiles are 
> mounted using EFS, Not able to Read the EFS files and apply it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33232) Kubernetive Operator Not Able to Take Other Python paramters While Submitting Job Deployment

2023-10-10 Thread Amarjeet Singh (Jira)
Amarjeet Singh created FLINK-33232:
--

 Summary: Kubernetive Operator Not Able to Take Other Python 
paramters While Submitting Job Deployment
 Key: FLINK-33232
 URL: https://issues.apache.org/jira/browse/FLINK-33232
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0
Reporter: Amarjeet Singh
 Fix For: 1.17.1






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Jira
Lauri Suurväli created FLINK-33231:
--

 Summary: Memory leak in KafkaSourceReader if no data in consumed 
topic
 Key: FLINK-33231
 URL: https://issues.apache.org/jira/browse/FLINK-33231
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1
Reporter: Lauri Suurväli
 Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png

*Problem description*

Our Flink streaming job TaskManager heap gets full when the job has nothing to 
consume and process.

It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
When there are no messages in the source topic the TaskManager heap usage 
starts increasing until the job exits after receiving a SIGTERM signal. We are 
running the job on AWS EMR with YARN.

The problems with the TaskManager heap usage do not occur when there is data to 
process. It's also worth noting that sending a single message to the source 
topic of a streaming job that has been sitting idle and suffers from the memory 
leak will cause the heap to be cleared. However it does not resolve the problem 
since the heap usage will start increasing immediately after processing the 
message.

!Screenshot 2023-10-10 at 12.49.37.png!

TaskManager heap used percentage is calculated by 

 
{code:java}
flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
 

 

 I was able to take heap dumps of the TaskManager processes during a high heap 
usage percentage. Heap dump analysis detected 912,355 instances of 
java.util.HashMap empty collections retaining >= 43,793,040 bytes.

!Screenshot 2023-10-09 at 14.13.43.png!

The retained heap seemed to be located at:

 
{code:java}
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
 

!Screenshot 2023-10-09 at 13.02.34.png!

 

*Possible hints:*

An empty HashMap is added during the snapshotState method to offsetsToCommit 
map if it does not already exist for the given checkpoint. [KafkaSourceReader 
line 
107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]

 
{code:java}
Map offsetsMap =
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
{code}
 

If the startingOffset for the given split is >= 0 then a new entry would be 
added to the map from the previous step. [KafkaSourceReader line 
113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
{code:java}
if (split.getStartingOffset() >= 0) {
offsetsMap.put(
split.getTopicPartition(),
new OffsetAndMetadata(split.getStartingOffset()));
}{code}
If the starting offset is smaller than 0 then this would leave the offsetMap 
created in step 1 empty. We can see from the logs that the startingOffset is -3 
when the splits are added to the reader.

 
{code:java}
Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
[Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
-9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
 

 

The offsetsToCommit map is cleaned from entries once they have been committed 
to Kafka which happens during the callback function that is passed to the 
KafkaSourceFetcherManager.commitOffsets method in 
KafkaSourceReader.notifyCheckpointComplete method.

However if the committedPartitions is empty for the given checkpoint, then the 
KafkaSourceFetcherManager.commitOffsets method returns.  
[KafkaSourceFetcherManager line 
78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
{code:java}
if (offsetsToCommit.isEmpty()) {
return;
} {code}
We can observe from the logs that indeed an empty map is encountered at this 
step:
{code:java}
Committing offsets {}{code}
*Conclusion*

It seems that an empty map gets added per each checkpoint to offsetsToCommit 
map. Since the startingOffset in our case is -3 then the empty map never gets 
filled. During the offset commit phase the offsets for these checkpoints are 
ignored, since there is nothing to 

Re: [PR] Bump guava from 30.1.1-jre to 32.0.0-jre in /flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka [flink-connector-kafka]

2023-10-10 Thread via GitHub


MartijnVisser commented on PR #33:
URL: 
https://github.com/apache/flink-connector-kafka/pull/33#issuecomment-1755427837

   @dependabot rebase


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33149) Bump snappy-java to 1.1.10.4

2023-10-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-33149:
---
Fix Version/s: kafka-3.1.0

> Bump snappy-java to 1.1.10.4
> 
>
> Key: FLINK-33149
> URL: https://issues.apache.org/jira/browse/FLINK-33149
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Connectors / AWS, Connectors / HBase, 
> Connectors / Kafka, Stateful Functions
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Ryan Skraba
>Assignee: Ryan Skraba
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, kafka-3.1.0, 1.16.3, 1.17.2, 1.19.0
>
>
> Xerial published a security alert for a Denial of Service attack that [exists 
> on 
> 1.1.10.1|https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv].
> This is included in flink-dist, but also in flink-statefun, and several 
> connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33149) Bump snappy-java to 1.1.10.4

2023-10-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773686#comment-17773686
 ] 

Martijn Visser commented on FLINK-33149:


Merged in:

apache/flink-connector-kafka:main 73f761fa73d4200d18f628eef7c79cf91dd1a0bc

> Bump snappy-java to 1.1.10.4
> 
>
> Key: FLINK-33149
> URL: https://issues.apache.org/jira/browse/FLINK-33149
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Connectors / AWS, Connectors / HBase, 
> Connectors / Kafka, Stateful Functions
>Affects Versions: 1.18.0, 1.16.3, 1.17.2
>Reporter: Ryan Skraba
>Assignee: Ryan Skraba
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, kafka-3.1.0, 1.16.3, 1.17.2, 1.19.0
>
>
> Xerial published a security alert for a Denial of Service attack that [exists 
> on 
> 1.1.10.1|https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv].
> This is included in flink-dist, but also in flink-statefun, and several 
> connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33149] Bump snappy-java from 1.1.8.3 to 1.1.10.5 [flink-connector-kafka]

2023-10-10 Thread via GitHub


MartijnVisser merged PR #34:
URL: https://github.com/apache/flink-connector-kafka/pull/34


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33228][flink-runtime] Fix the total current resource calculation when fulfilling requirement [flink]

2023-10-10 Thread via GitHub


KarmaGYZ commented on PR #23502:
URL: https://github.com/apache/flink/pull/23502#issuecomment-1755330741

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32563] Add disable-archunit-tests profile [flink-connector-shared-utils]

2023-10-10 Thread via GitHub


echauchot commented on PR #21:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/21#issuecomment-1755288912

   I submitted against main instead of parent-pom branch closing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32563] Add disable-archunit-tests profile [flink-connector-shared-utils]

2023-10-10 Thread via GitHub


echauchot closed pull request #21: [FLINK-32563] Add disable-archunit-tests 
profile
URL: https://github.com/apache/flink-connector-shared-utils/pull/21


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32563) execute sanity checks only with Flink version that connectors were built against

2023-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32563:
---
Labels: pull-request-available  (was: )

> execute sanity checks only with Flink version that connectors were built 
> against
> 
>
> Key: FLINK-32563
> URL: https://issues.apache.org/jira/browse/FLINK-32563
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Build System / CI
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> As part of [this 
> discussion|https://lists.apache.org/thread/pr0g812olzpgz21d9oodhc46db9jpxo3] 
> , the need for connectors to specify the main flink version that a connector 
> supports has arisen. 
> This CI variable will allow to configure the build and tests differently 
> depending on this version. This parameter would be optional.
> The first use case is to run archunit tests only on the main supported 
> version as discussed in the above thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352333625


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
 protected final EventRecorder eventRecorder;
 protected final StatusRecorder statusRecorder;
-protected final JobAutoScaler resourceScaler;
+protected final JobAutoScaler 
resourceScaler;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33229][table-planner] Moving Java class FlinkRecomputeStatisticsProgram from scala package to java package [flink]

2023-10-10 Thread via GitHub


lincoln-lil merged PR #23503:
URL: https://github.com/apache/flink/pull/23503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33229][table-planner] Moving Java class FlinkRecomputeStatisticsProgram from scala package to java package [flink]

2023-10-10 Thread via GitHub


lincoln-lil commented on PR #23503:
URL: https://github.com/apache/flink/pull/23503#issuecomment-1755222956

   It's a reasonable move. +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI

2023-10-10 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-33230:


Assignee: Yu Chen

> Support Expanding ExecutionGraph to StreamGraph in Web UI
> -
>
> Key: FLINK-33230
> URL: https://issues.apache.org/jira/browse/FLINK-33230
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Assignee: Yu Chen
>Priority: Major
> Attachments: image-2023-10-10-18-52-38-252.png
>
>
> Flink Web shows users the ExecutionGraph (i.e., chained operators), but in 
> some cases, we would like to know the structure of the chained operators as 
> well as the necessary metrics such as the inputs and outputs of data, etc.
>  
> Thus, we propose to show the stream graphs and some related metrics such as 
> numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).
>  
> !image-2023-10-10-18-52-38-252.png|width=750,height=263!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][connectors/mongodb] Fix typo and optimize log [flink-connector-mongodb]

2023-10-10 Thread via GitHub


Jiabao-Sun commented on PR #16:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/16#issuecomment-1755190195

   Hi @hlteoh37, could you help review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33228][flink-runtime] Fix the total current resource calculation when fulfilling requirement [flink]

2023-10-10 Thread via GitHub


KarmaGYZ commented on PR #23502:
URL: https://github.com/apache/flink/pull/23502#issuecomment-1755184939

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352290023


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext ctx) throws 
Exception {
 }
 }
 
+private void scaling(FlinkResourceContext ctx) throws Exception {
+KubernetesJobAutoScalerContext autoScalerContext = 
ctx.getJobAutoScalerContext();
+
+if (autoscalerDisabled(ctx)) {
+autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
+resourceScaler.scale(autoScalerContext);
+return;
+}
+if (waitingForRunning(ctx.getResource().getStatus())) {
+LOG.info("Autoscaler is waiting for  stable, running state");
+resourceScaler.cleanup(autoScalerContext.getJobKey());
+return;

Review Comment:
   Preferably, I would like any logic related to applying parallelism inside 
the autoscaler implementation. This shouldn't change when the autoscaler is 
waiting for the running state. In fact, the job state checks should also be 
performed by the autoscaler, not by the reconciler. The current code mixes 
control over the parallelism overrides between the reconciler and the 
autoscaler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352282612


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
 protected final EventRecorder eventRecorder;
 protected final StatusRecorder statusRecorder;
-protected final JobAutoScaler resourceScaler;
+protected final JobAutoScaler 
resourceScaler;

Review Comment:
   Can we rename this?
   
   ```suggestion
   protected final JobAutoScaler autoscaler;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1351975279


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -15,149 +15,180 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+public static final String DEPRECATED_K8S_OP_CONF_PREFIX = 
"kubernetes.operator.";
+public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+private static String deprecatedOperatorConfigKey(String key) {
+return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+}
+
+private static String autoScalerConfigKey(String key) {
+return AUTOSCALER_CONF_PREFIX + key;
+}
+
 private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-return operatorConfig("job.autoscaler." + key);
+return ConfigOptions.key(autoScalerConfigKey(key));
 }
 
 public static final ConfigOption AUTOSCALER_ENABLED =
 autoScalerConfig("enabled")
 .booleanType()
 .defaultValue(false)
+.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))

Review Comment:
   Added:
   > Note: The option prefix `kubernetes.operator.` was removed in FLIP-334, 
because the autoscaler module was decoupled from flink-kubernetes-operator.
   



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl>
+implements JobAutoScaler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+@VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";

Review Comment:
   It's used at `BacklogBasedScalingTest`.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ 

[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI

2023-10-10 Thread Yu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Chen updated FLINK-33230:

Attachment: (was: image-2023-10-10-18-45-24-486.png)

> Support Expanding ExecutionGraph to StreamGraph in Web UI
> -
>
> Key: FLINK-33230
> URL: https://issues.apache.org/jira/browse/FLINK-33230
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Priority: Major
> Attachments: image-2023-10-10-18-52-38-252.png
>
>
> Flink Web shows users the ExecutionGraph (i.e., chained operators), but in 
> some cases, we would like to know the structure of the chained operators as 
> well as the necessary metrics such as the inputs and outputs of data, etc.
>  
> Thus, we propose to show the stream graphs and some related metrics such as 
> numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).
>  
> !image-2023-10-10-18-52-38-252.png|width=750,height=263!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI

2023-10-10 Thread Yu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Chen updated FLINK-33230:

Description: 
Flink Web shows users the ExecutionGraph (i.e., chained operators), but in some 
cases, we would like to know the structure of the chained operators as well as 
the necessary metrics such as the inputs and outputs of data, etc.

 

Thus, we propose to show the stream graphs and some related metrics such as 
numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).

 

!image-2023-10-10-18-52-38-252.png|width=750,height=263!

  was:
Flink Web shows users the ExecutionGraph (i.e., chained operators), but in some 
cases, we would like to know the structure of the chained operators as well as 
the necessary metrics such as the inputs and outputs of data, etc.

 

Thus, we propose to show the stream graphs and some related metrics such as 
numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).

 

!image-2023-10-10-18-45-42-991.png|width=508,height=178!


> Support Expanding ExecutionGraph to StreamGraph in Web UI
> -
>
> Key: FLINK-33230
> URL: https://issues.apache.org/jira/browse/FLINK-33230
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Priority: Major
> Attachments: image-2023-10-10-18-45-24-486.png, 
> image-2023-10-10-18-52-38-252.png
>
>
> Flink Web shows users the ExecutionGraph (i.e., chained operators), but in 
> some cases, we would like to know the structure of the chained operators as 
> well as the necessary metrics such as the inputs and outputs of data, etc.
>  
> Thus, we propose to show the stream graphs and some related metrics such as 
> numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).
>  
> !image-2023-10-10-18-52-38-252.png|width=750,height=263!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI

2023-10-10 Thread Yu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Chen updated FLINK-33230:

Attachment: image-2023-10-10-18-52-38-252.png

> Support Expanding ExecutionGraph to StreamGraph in Web UI
> -
>
> Key: FLINK-33230
> URL: https://issues.apache.org/jira/browse/FLINK-33230
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Priority: Major
> Attachments: image-2023-10-10-18-45-24-486.png, 
> image-2023-10-10-18-52-38-252.png
>
>
> Flink Web shows users the ExecutionGraph (i.e., chained operators), but in 
> some cases, we would like to know the structure of the chained operators as 
> well as the necessary metrics such as the inputs and outputs of data, etc.
>  
> Thus, we propose to show the stream graphs and some related metrics such as 
> numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).
>  
> !image-2023-10-10-18-45-42-991.png|width=508,height=178!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Web UI

2023-10-10 Thread Yu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Chen updated FLINK-33230:

   Attachment: image-2023-10-10-18-45-24-486.png
  Component/s: Runtime / Web Frontend
Affects Version/s: 1.19.0
  Description: 
Flink Web shows users the ExecutionGraph (i.e., chained operators), but in some 
cases, we would like to know the structure of the chained operators as well as 
the necessary metrics such as the inputs and outputs of data, etc.

 

Thus, we propose to show the stream graphs and some related metrics such as 
numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).

 

!image-2023-10-10-18-45-42-991.png|width=508,height=178!
  Summary: Support Expanding ExecutionGraph to StreamGraph in Web 
UI  (was: Support Expanding ExecutionGraph to StreamGraph in Flink)

> Support Expanding ExecutionGraph to StreamGraph in Web UI
> -
>
> Key: FLINK-33230
> URL: https://issues.apache.org/jira/browse/FLINK-33230
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Priority: Major
> Attachments: image-2023-10-10-18-45-24-486.png, 
> image-2023-10-10-18-52-38-252.png
>
>
> Flink Web shows users the ExecutionGraph (i.e., chained operators), but in 
> some cases, we would like to know the structure of the chained operators as 
> well as the necessary metrics such as the inputs and outputs of data, etc.
>  
> Thus, we propose to show the stream graphs and some related metrics such as 
> numberRecordInand numberRecordOut on the Flink Web (As shown in the Figure).
>  
> !image-2023-10-10-18-45-42-991.png|width=508,height=178!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-15462) Introduce TrinoSqlDialect

2023-10-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-15462.
-
Fix Version/s: jdbc-3.2.0
   Resolution: Fixed

> Introduce TrinoSqlDialect
> -
>
> Key: FLINK-15462
> URL: https://issues.apache.org/jira/browse/FLINK-15462
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: li yu
>Assignee: João Boto
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: jdbc-3.2.0
>
>
> flink-jdbc support
> Derby, Mysql, Postgre
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java]
> Could we add support for prestosql ?
> Link to prestosql jdbc [https://prestosql.io/download.html]
> Advantage is presto support a variety of data source (i.e we could 
> ingest/load data to or from those data source just through presto jdbc)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-15462) Introduce TrinoSqlDialect

2023-10-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773641#comment-17773641
 ] 

Sergey Nuyanzin commented on FLINK-15462:
-

Merged as 
[8e0496a5a9727087b38c2fc412a397a232ee0f5f|https://github.com/apache/flink-connector-jdbc/commit/8e0496a5a9727087b38c2fc412a397a232ee0f5f]

> Introduce TrinoSqlDialect
> -
>
> Key: FLINK-15462
> URL: https://issues.apache.org/jira/browse/FLINK-15462
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: li yu
>Assignee: João Boto
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> flink-jdbc support
> Derby, Mysql, Postgre
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java]
> Could we add support for prestosql ?
> Link to prestosql jdbc [https://prestosql.io/download.html]
> Advantage is presto support a variety of data source (i.e we could 
> ingest/load data to or from those data source just through presto jdbc)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-15462][connectors] Add Trino dialect [flink-connector-jdbc]

2023-10-10 Thread via GitHub


snuyanzin merged PR #3:
URL: https://github.com/apache/flink-connector-jdbc/pull/3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-15462) Introduce TrinoSqlDialect

2023-10-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-15462:
---

Assignee: Joan Schipper

> Introduce TrinoSqlDialect
> -
>
> Key: FLINK-15462
> URL: https://issues.apache.org/jira/browse/FLINK-15462
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: li yu
>Assignee: Joan Schipper
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> flink-jdbc support
> Derby, Mysql, Postgre
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java]
> Could we add support for prestosql ?
> Link to prestosql jdbc [https://prestosql.io/download.html]
> Advantage is presto support a variety of data source (i.e we could 
> ingest/load data to or from those data source just through presto jdbc)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-15462) Introduce TrinoSqlDialect

2023-10-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-15462:

Summary: Introduce TrinoSqlDialect  (was: Introduce PrestoSqlDialect)

> Introduce TrinoSqlDialect
> -
>
> Key: FLINK-15462
> URL: https://issues.apache.org/jira/browse/FLINK-15462
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: li yu
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> flink-jdbc support
> Derby, Mysql, Postgre
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java]
> Could we add support for prestosql ?
> Link to prestosql jdbc [https://prestosql.io/download.html]
> Advantage is presto support a variety of data source (i.e we could 
> ingest/load data to or from those data source just through presto jdbc)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-15462) Introduce TrinoSqlDialect

2023-10-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-15462:
---

Assignee: João Boto  (was: Joan Schipper)

> Introduce TrinoSqlDialect
> -
>
> Key: FLINK-15462
> URL: https://issues.apache.org/jira/browse/FLINK-15462
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: li yu
>Assignee: João Boto
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> flink-jdbc support
> Derby, Mysql, Postgre
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java]
> Could we add support for prestosql ?
> Link to prestosql jdbc [https://prestosql.io/download.html]
> Advantage is presto support a variety of data source (i.e we could 
> ingest/load data to or from those data source just through presto jdbc)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33230) Support Expanding ExecutionGraph to StreamGraph in Flink

2023-10-10 Thread Yu Chen (Jira)
Yu Chen created FLINK-33230:
---

 Summary: Support Expanding ExecutionGraph to StreamGraph in Flink
 Key: FLINK-33230
 URL: https://issues.apache.org/jira/browse/FLINK-33230
 Project: Flink
  Issue Type: Improvement
Reporter: Yu Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2023-10-10 Thread via GitHub


mxm commented on PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1754958456

   Hey Mason! Do you want a review for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >