[jira] [Updated] (FLINK-35103) [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic Termination Log Integration

2024-04-15 Thread SwathiChandrashekar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SwathiChandrashekar updated FLINK-35103:

Description: 
Currently, whenever we have flink failures, we need to manually do the triaging 
by looking into the flink logs even for the initial analysis. It would have 
been better, if the user/admin directly gets the initial failure information 
even before looking into the logs.

To address this, we've developed a comprehensive solution via a plugin aimed at 
helping fetch the Flink failures, ensuring critical data is preserved for 
subsequent analysis and action.

 

In Kubernetes environments, troubleshooting pod failures can be challenging 
without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
mechanism to enhance debugging capabilities by leveraging the 
/dev/termination-log file.

[https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]

By writing failure information to this log, Kubernetes automatically 
incorporates it into the container status, providing administrators and 
developers with valuable insights into the root cause of failures.

Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
Flink failure reporting within the container ecosystem. Whenever a Flink 
encounters an issue, our plugin dynamically captures and logs the pertinent 
failure information into the /dev/termination-log file. This ensures that 
Kubernetes recognizes and propagates the failure status throughout the 
container ecosystem, enabling efficient monitoring and response mechanisms.

By leveraging Kubernetes' native functionality in this manner, our plugin 
ensures that Flink failure incidents are promptly identified and reflected in 
the pod status. This technical integration streamlines the debugging process, 
empowering operators to swiftly diagnose and address issues, thereby minimizing 
downtime and maximizing system reliability.

 

In-order to make this plugin generic, by default it doesn't do any action.  We 
can configure this by using

*external.log.factory.class : 
org.apache.flink.externalresource.log.K8SSupportTerminationLog*

This will be present in the plugins directory
PFA for the pod status
 !screenshot-1.png! 

 

 

  was:
Currently, whenever we have flink failures, we need to manually do the triaging 
by looking into the flink logs even for the initial analysis. It would have 
been better, if the user/admin directly gets the initial failure information 
even before looking into the logs.

To address this, we've developed a comprehensive solution via a plugin aimed at 
helping fetch the Flink failures, ensuring critical data is preserved for 
subsequent analysis and action.

 

In Kubernetes environments, troubleshooting pod failures can be challenging 
without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
mechanism to enhance debugging capabilities by leveraging the 
/dev/termination-log file.

[https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]

By writing failure information to this log, Kubernetes automatically 
incorporates it into the container status, providing administrators and 
developers with valuable insights into the root cause of failures.

Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
Flink failure reporting within the container ecosystem. Whenever a Flink 
encounters an issue, our plugin dynamically captures and logs the pertinent 
failure information into the /dev/termination-log file. This ensures that 
Kubernetes recognizes and propagates the failure status throughout the 
container ecosystem, enabling efficient monitoring and response mechanisms.

By leveraging Kubernetes' native functionality in this manner, our plugin 
ensures that Flink failure incidents are promptly identified and reflected in 
the pod status. This technical integration streamlines the debugging process, 
empowering operators to swiftly diagnose and address issues, thereby minimizing 
downtime and maximizing system reliability.

 

In-order to make this plugin generic, by default it doesn't do any action.  We 
can configure this by using

*external.log.factory.class : 
org.apache.flink.externalresource.log.K8SSupportTerminationLog*

This will be present in the plugins directory
PFA for the pod status

 

 


> [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic 
> Termination Log Integration
> --
>
> Key: FLINK-35103
> URL: https://issues.apache.org/jira/browse/FLINK-35103
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: SwathiChandrashekar
>Priority: Not a Priority
> Fix For: 1.20.0
>
> 

[jira] [Created] (FLINK-35103) [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic Termination Log Integration

2024-04-15 Thread SwathiChandrashekar (Jira)
SwathiChandrashekar created FLINK-35103:
---

 Summary: [Plugin] Enhancing Flink Failure Management in Kubernetes 
with Dynamic Termination Log Integration
 Key: FLINK-35103
 URL: https://issues.apache.org/jira/browse/FLINK-35103
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: SwathiChandrashekar
 Fix For: 1.20.0
 Attachments: Status-pod.png

Currently, whenever we have flink failures, we need to manually do the triaging 
by looking into the flink logs even for the initial analysis. It would have 
been better, if the user/admin directly gets the initial failure information 
even before looking into the logs.

To address this, we've developed a comprehensive solution via a plugin aimed at 
helping fetch the Flink failures, ensuring critical data is preserved for 
subsequent analysis and action.

 

In Kubernetes environments, troubleshooting pod failures can be challenging 
without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
mechanism to enhance debugging capabilities by leveraging the 
/dev/termination-log file.

[https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]

By writing failure information to this log, Kubernetes automatically 
incorporates it into the container status, providing administrators and 
developers with valuable insights into the root cause of failures.

Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
Flink failure reporting within the container ecosystem. Whenever a Flink 
encounters an issue, our plugin dynamically captures and logs the pertinent 
failure information into the /dev/termination-log file. This ensures that 
Kubernetes recognizes and propagates the failure status throughout the 
container ecosystem, enabling efficient monitoring and response mechanisms.

By leveraging Kubernetes' native functionality in this manner, our plugin 
ensures that Flink failure incidents are promptly identified and reflected in 
the pod status. This technical integration streamlines the debugging process, 
empowering operators to swiftly diagnose and address issues, thereby minimizing 
downtime and maximizing system reliability.

 

In-order to make this plugin generic, by default it doesn't do any action.  We 
can configure this by using

*external.log.factory.class : 
org.apache.flink.externalresource.log.K8SSupportTerminationLog*

This will be present in the plugins directory
PFA for the pod status

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35103) [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic Termination Log Integration

2024-04-15 Thread SwathiChandrashekar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SwathiChandrashekar updated FLINK-35103:

Description: 
Currently, whenever we have flink failures, we need to manually do the triaging 
by looking into the flink logs even for the initial analysis. It would have 
been better, if the user/admin directly gets the initial failure information 
even before looking into the logs.

To address this, we've developed a comprehensive solution via a plugin aimed at 
helping fetch the Flink failures, ensuring critical data is preserved for 
subsequent analysis and action.

 

In Kubernetes environments, troubleshooting pod failures can be challenging 
without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
mechanism to enhance debugging capabilities by leveraging the 
/dev/termination-log file.

[https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]

By writing failure information to this log, Kubernetes automatically 
incorporates it into the container status, providing administrators and 
developers with valuable insights into the root cause of failures.

Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
Flink failure reporting within the container ecosystem. Whenever a Flink 
encounters an issue, our plugin dynamically captures and logs the pertinent 
failure information into the /dev/termination-log file. This ensures that 
Kubernetes recognizes and propagates the failure status throughout the 
container ecosystem, enabling efficient monitoring and response mechanisms.

By leveraging Kubernetes' native functionality in this manner, our plugin 
ensures that Flink failure incidents are promptly identified and reflected in 
the pod status. This technical integration streamlines the debugging process, 
empowering operators to swiftly diagnose and address issues, thereby minimizing 
downtime and maximizing system reliability.

 

In-order to make this plugin generic, by default it doesn't do any action.  We 
can configure this by using

*external.log.factory.class : 
org.apache.flink.externalresource.log.K8SSupportTerminationLog*
in our flink-conf file.

This will be present in the plugins directory
PFA for the pod status
 !screenshot-1.png! 

 

 

  was:
Currently, whenever we have flink failures, we need to manually do the triaging 
by looking into the flink logs even for the initial analysis. It would have 
been better, if the user/admin directly gets the initial failure information 
even before looking into the logs.

To address this, we've developed a comprehensive solution via a plugin aimed at 
helping fetch the Flink failures, ensuring critical data is preserved for 
subsequent analysis and action.

 

In Kubernetes environments, troubleshooting pod failures can be challenging 
without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
mechanism to enhance debugging capabilities by leveraging the 
/dev/termination-log file.

[https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]

By writing failure information to this log, Kubernetes automatically 
incorporates it into the container status, providing administrators and 
developers with valuable insights into the root cause of failures.

Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
Flink failure reporting within the container ecosystem. Whenever a Flink 
encounters an issue, our plugin dynamically captures and logs the pertinent 
failure information into the /dev/termination-log file. This ensures that 
Kubernetes recognizes and propagates the failure status throughout the 
container ecosystem, enabling efficient monitoring and response mechanisms.

By leveraging Kubernetes' native functionality in this manner, our plugin 
ensures that Flink failure incidents are promptly identified and reflected in 
the pod status. This technical integration streamlines the debugging process, 
empowering operators to swiftly diagnose and address issues, thereby minimizing 
downtime and maximizing system reliability.

 

In-order to make this plugin generic, by default it doesn't do any action.  We 
can configure this by using

*external.log.factory.class : 
org.apache.flink.externalresource.log.K8SSupportTerminationLog*

This will be present in the plugins directory
PFA for the pod status
 !screenshot-1.png! 

 

 


> [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic 
> Termination Log Integration
> --
>
> Key: FLINK-35103
> URL: https://issues.apache.org/jira/browse/FLINK-35103
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: SwathiChandrashekar
>Priority: Not 

Re: [PR] [FLINK-34961][BP v.1.1] Use dedicated CI name for MongoDB connector to differentiate it in infra-reports [flink-connector-mongodb]

2024-04-15 Thread via GitHub


snuyanzin merged PR #34:
URL: https://github.com/apache/flink-connector-mongodb/pull/34


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35103) [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic Termination Log Integration

2024-04-15 Thread SwathiChandrashekar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SwathiChandrashekar updated FLINK-35103:

Description: 
Currently, whenever we have flink failures, we need to manually do the triaging 
by looking into the flink logs even for the initial analysis. It would have 
been better, if the user/admin directly gets the initial failure information 
even before looking into the logs.

To address this, we've developed a comprehensive solution via a plugin aimed at 
helping fetch the Flink failures, ensuring critical data is preserved for 
subsequent analysis and action.

 

In Kubernetes environments, troubleshooting pod failures can be challenging 
without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
mechanism to enhance debugging capabilities by leveraging the 
/dev/termination-log file.

[https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]

By writing failure information to this log, Kubernetes automatically 
incorporates it into the container status, providing administrators and 
developers with valuable insights into the root cause of failures.

Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
Flink failure reporting within the container ecosystem. Whenever a Flink 
encounters an issue, our plugin dynamically captures and logs the pertinent 
failure information into the /dev/termination-log file. This ensures that 
Kubernetes recognizes and propagates the failure status throughout the 
container ecosystem, enabling efficient monitoring and response mechanisms.

By leveraging Kubernetes' native functionality in this manner, our plugin 
ensures that Flink failure incidents are promptly identified and reflected in 
the pod status. This technical integration streamlines the debugging process, 
empowering operators to swiftly diagnose and address issues, thereby minimizing 
downtime and maximizing system reliability.

 

In-order to make this plugin generic, by default it doesn't do any action.  We 
can configure this by using

*external.log.factory.class : 
org.apache.flink.externalresource.log.K8SSupportTerminationLog*
in our flink-conf file.

This will be present in the plugins directory

Sample output of the flink pod container status when there is a flink failure.
 !screenshot-1.png! 

here, we can see that , the user can clearly understand there was a Auth issue 
and resolve it instead of checking the complete underlying logs.

 

 

  was:
Currently, whenever we have flink failures, we need to manually do the triaging 
by looking into the flink logs even for the initial analysis. It would have 
been better, if the user/admin directly gets the initial failure information 
even before looking into the logs.

To address this, we've developed a comprehensive solution via a plugin aimed at 
helping fetch the Flink failures, ensuring critical data is preserved for 
subsequent analysis and action.

 

In Kubernetes environments, troubleshooting pod failures can be challenging 
without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
mechanism to enhance debugging capabilities by leveraging the 
/dev/termination-log file.

[https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]

By writing failure information to this log, Kubernetes automatically 
incorporates it into the container status, providing administrators and 
developers with valuable insights into the root cause of failures.

Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
Flink failure reporting within the container ecosystem. Whenever a Flink 
encounters an issue, our plugin dynamically captures and logs the pertinent 
failure information into the /dev/termination-log file. This ensures that 
Kubernetes recognizes and propagates the failure status throughout the 
container ecosystem, enabling efficient monitoring and response mechanisms.

By leveraging Kubernetes' native functionality in this manner, our plugin 
ensures that Flink failure incidents are promptly identified and reflected in 
the pod status. This technical integration streamlines the debugging process, 
empowering operators to swiftly diagnose and address issues, thereby minimizing 
downtime and maximizing system reliability.

 

In-order to make this plugin generic, by default it doesn't do any action.  We 
can configure this by using

*external.log.factory.class : 
org.apache.flink.externalresource.log.K8SSupportTerminationLog*
in our flink-conf file.

This will be present in the plugins directory
PFA for the pod status
 !screenshot-1.png! 

 

 


> [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic 
> Termination Log Integration
> --
>
> Key: FLINK-35103
> 

[jira] [Resolved] (FLINK-34962) flink-connector-pulsa starts failed due to incorrect use of Pulsar API: LookupService. getPartitionedTopicMetadata

2024-04-15 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen resolved FLINK-34962.
---
Fix Version/s: pulsar-4.2.0
   Resolution: Fixed

master via 
https://github.com/apache/flink-connector-pulsar/commit/7340f713422b1734e84ec0602f154441b8da7fab

> flink-connector-pulsa starts failed due to incorrect use of Pulsar API: 
> LookupService. getPartitionedTopicMetadata
> --
>
> Key: FLINK-34962
> URL: https://issues.apache.org/jira/browse/FLINK-34962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.2.0, pulsar-4.1.1
> Environment: * flink 1.17
>  * pulsar client 3.0.0
>  * org.apache.flink:flink-connector-pulsar:4.1.0-1.17 (connector)
>Reporter: Yubiao Feng
>Priority: Major
>  Labels: easyfix, pull-request-available
> Fix For: pulsar-4.2.0
>
>
> - The unnecessary codes calls 
> `pulsarClient.getLookup().getPartitionedTopicMetadata()` to create the 
> partitioned topic metadata(in fact, this behavior of is not correct)
>   - Why it is unnecessary: the [following 
> code]([https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L245])
>  that is creating a producer will also trigger partitioned topic metadata to 
> create.
>  - The method `pulsarClient.getLookup().getPartitionedTopicMetadata()` will 
> not retry if the connection is closed so that users will get an error. The 
> following code creates a producer that will retry if the connection is 
> closed, reducing the probability of an error occurring.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34962) flink-connector-pulsa starts failed due to incorrect use of Pulsar API: LookupService. getPartitionedTopicMetadata

2024-04-15 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-34962:
--
Affects Version/s: (was: pulsar-4.1.1)

> flink-connector-pulsa starts failed due to incorrect use of Pulsar API: 
> LookupService. getPartitionedTopicMetadata
> --
>
> Key: FLINK-34962
> URL: https://issues.apache.org/jira/browse/FLINK-34962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
> Environment: * flink 1.17
>  * pulsar client 3.0.0
>  * org.apache.flink:flink-connector-pulsar:4.1.0-1.17 (connector)
>Reporter: Yubiao Feng
>Priority: Major
>  Labels: easyfix, pull-request-available
> Fix For: pulsar-4.2.0
>
>
> - The unnecessary codes calls 
> `pulsarClient.getLookup().getPartitionedTopicMetadata()` to create the 
> partitioned topic metadata(in fact, this behavior of is not correct)
>   - Why it is unnecessary: the [following 
> code]([https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L245])
>  that is creating a producer will also trigger partitioned topic metadata to 
> create.
>  - The method `pulsarClient.getLookup().getPartitionedTopicMetadata()` will 
> not retry if the connection is closed so that users will get an error. The 
> following code creates a producer that will retry if the connection is 
> closed, reducing the probability of an error occurring.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34962) flink-connector-pulsa starts failed due to incorrect use of Pulsar API: LookupService. getPartitionedTopicMetadata

2024-04-15 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-34962:
--
Affects Version/s: (was: pulsar-4.2.0)

> flink-connector-pulsa starts failed due to incorrect use of Pulsar API: 
> LookupService. getPartitionedTopicMetadata
> --
>
> Key: FLINK-34962
> URL: https://issues.apache.org/jira/browse/FLINK-34962
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: pulsar-4.1.1
> Environment: * flink 1.17
>  * pulsar client 3.0.0
>  * org.apache.flink:flink-connector-pulsar:4.1.0-1.17 (connector)
>Reporter: Yubiao Feng
>Priority: Major
>  Labels: easyfix, pull-request-available
> Fix For: pulsar-4.2.0
>
>
> - The unnecessary codes calls 
> `pulsarClient.getLookup().getPartitionedTopicMetadata()` to create the 
> partitioned topic metadata(in fact, this behavior of is not correct)
>   - Why it is unnecessary: the [following 
> code]([https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L245])
>  that is creating a producer will also trigger partitioned topic metadata to 
> create.
>  - The method `pulsarClient.getLookup().getPartitionedTopicMetadata()` will 
> not retry if the connection is closed so that users will get an error. The 
> following code creates a producer that will retry if the connection is 
> closed, reducing the probability of an error occurring.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35104) Add kafka pipeline data source connector

2024-04-15 Thread melin (Jira)
melin created FLINK-35104:
-

 Summary: Add kafka pipeline data source connector
 Key: FLINK-35104
 URL: https://issues.apache.org/jira/browse/FLINK-35104
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: melin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35104) Add kafka pipeline data source connector

2024-04-15 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-35104:
--
Description: 
First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

https://github.com/apache/flink-cdc/pull/2938

 

> Add kafka pipeline data source connector
> 
>
> Key: FLINK-35104
> URL: https://issues.apache.org/jira/browse/FLINK-35104
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> First collect cdc data in real time and write it to kafka, then write it to 
> multiple different data sources in real time
> https://github.com/apache/flink-cdc/pull/2938
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35104) Add kafka pipeline data source connector

2024-04-15 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-35104:
--
Description: 
There is already a [kafka pipeline data sink 
connector|[https://github.com/apache/flink-cdc/pull/2938]], and there should 
also be a kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 

  was:
First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

https://github.com/apache/flink-cdc/pull/2938

 


> Add kafka pipeline data source connector
> 
>
> Key: FLINK-35104
> URL: https://issues.apache.org/jira/browse/FLINK-35104
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> There is already a [kafka pipeline data sink 
> connector|[https://github.com/apache/flink-cdc/pull/2938]], and there should 
> also be a kafka pipeline data source connector.
> First collect cdc data in real time and write it to kafka, then write it to 
> multiple different data sources in real time
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35104) Add kafka pipeline data source connector

2024-04-15 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-35104:
--
Description: 
There is already a kafka pipeline data sink connector : 
[https://github.com/apache/flink-cdc/pull/2938], and there should also be a 
kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 
 
 

  was:
There is already a kafka pipeline data sink connector, and there should also be 
a kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 
 
 


> Add kafka pipeline data source connector
> 
>
> Key: FLINK-35104
> URL: https://issues.apache.org/jira/browse/FLINK-35104
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> There is already a kafka pipeline data sink connector : 
> [https://github.com/apache/flink-cdc/pull/2938], and there should also be a 
> kafka pipeline data source connector.
> First collect cdc data in real time and write it to kafka, then write it to 
> multiple different data sources in real time
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565293565


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   Seems it's a bit confusing here so I just removed it currently.
   Let's add it if needed in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35105) Support setting default Autoscaler options at autoscaler standalone level

2024-04-15 Thread Rui Fan (Jira)
Rui Fan created FLINK-35105:
---

 Summary: Support setting default Autoscaler options at autoscaler 
standalone level
 Key: FLINK-35105
 URL: https://issues.apache.org/jira/browse/FLINK-35105
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: kubernetes-operator-1.9.0


Currently, autoscaler standalone doesn't support set [autoscaler 
options|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/configuration/#autoscaler-configuration].
 We must set them at job level when we use autoscaler standalone. It's not 
convenient if platform administrator wanna change the default value for some 
autoscaler options, such as:
 * job.autoscaler.enabled
 * job.autoscaler.metrics.window
 * etc

This Jira supports setting Autoscaler options at autoscaler standalone level, 
it's similar with flink kubernetes operator.

The  autoscaler options of autoscaler standalone will be as the base 
configuration, and the configuration at job-level can override the default 
value provided in the autoscaler standalone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35104) Add kafka pipeline data source connector

2024-04-15 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-35104:
--
Description: 
There is already a kafka pipeline data sink connector : 
[https://github.com/apache/flink-cdc/pull/2938], and there should also be a 
kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different datasources in real time

 

 
 
 
 

  was:
There is already a kafka pipeline data sink connector : 
[https://github.com/apache/flink-cdc/pull/2938], and there should also be a 
kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 
 
 


> Add kafka pipeline data source connector
> 
>
> Key: FLINK-35104
> URL: https://issues.apache.org/jira/browse/FLINK-35104
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> There is already a kafka pipeline data sink connector : 
> [https://github.com/apache/flink-cdc/pull/2938], and there should also be a 
> kafka pipeline data source connector.
> First collect cdc data in real time and write it to kafka, then write it to 
> multiple different datasources in real time
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35104) Add kafka pipeline data source connector

2024-04-15 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-35104:
--
Description: 
There is already a kafka pipeline data sink connector, and there should also be 
a kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 
 

  was:
There is already a [kafka pipeline data sink 
connector|[https://github.com/apache/flink-cdc/pull/2938]], and there should 
also be a kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 


> Add kafka pipeline data source connector
> 
>
> Key: FLINK-35104
> URL: https://issues.apache.org/jira/browse/FLINK-35104
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> There is already a kafka pipeline data sink connector, and there should also 
> be a kafka pipeline data source connector.
> First collect cdc data in real time and write it to kafka, then write it to 
> multiple different data sources in real time
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35104) Add kafka pipeline data source connector

2024-04-15 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-35104:
--
Description: 
There is already a kafka pipeline data sink connector, and there should also be 
a kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 
 
 

  was:
There is already a kafka pipeline data sink connector, and there should also be 
a kafka pipeline data source connector.

First collect cdc data in real time and write it to kafka, then write it to 
multiple different data sources in real time

 

 
 
 


> Add kafka pipeline data source connector
> 
>
> Key: FLINK-35104
> URL: https://issues.apache.org/jira/browse/FLINK-35104
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> There is already a kafka pipeline data sink connector, and there should also 
> be a kafka pipeline data source connector.
> First collect cdc data in real time and write it to kafka, then write it to 
> multiple different data sources in real time
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER

2024-04-15 Thread yisha zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837095#comment-17837095
 ] 

yisha zhou commented on FLINK-35037:


[~libenchao]  The PR has been submitted. Please help reviewing it when you have 
time. Thx.

> Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER
> ---
>
> Key: FLINK-35037
> URL: https://issues.apache.org/jira/browse/FLINK-35037
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: yisha zhou
>Assignee: yisha zhou
>Priority: Major
>  Labels: pull-request-available
>
> In current Implementation, relNodes with Window type will only deliver 
> upsert/unique keys of their inputs.
> However windows with ROW_NUMBER can also produce upsert/unique keys.
> For example:
> {code:java}
> select id, name, score, age, class,
>     row_number() over(partition by class order by name) as rn,
>     rank() over (partition by class order by score) as rk,
>     dense_rank() over (partition by class order by score) as drk,
>     avg(score) over (partition by class order by score) as avg_score,
>     max(score) over (partition by age) as max_score,
>     count(id) over (partition by age) as cnt
> from student {code}
> (class, rn) is a valid upsert/unique keys candidate. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35105][autoscaler] Support setting default Autoscaler options at autoscaler standalone level [flink-kubernetes-operator]

2024-04-15 Thread via GitHub


1996fanrui opened a new pull request, #814:
URL: https://github.com/apache/flink-kubernetes-operator/pull/814

   ## What is the purpose of the change
   
   Currently, autoscaler standalone doesn't support set [autoscaler 
options](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/configuration/#autoscaler-configuration).
 We must set them at job level when we use autoscaler standalone. It's not 
convenient if platform administrator wanna change the default value for some 
autoscaler options, such as:
   
   - job.autoscaler.enabled
   - job.autoscaler.metrics.window
   - etc
   
   ## Brief change log
   
   - [FLINK-35105][autoscaler] Support setting default Autoscaler options at 
autoscaler standalone level
 - This Jira supports setting Autoscaler options at autoscaler standalone 
level, it's similar with flink kubernetes operator.
 - The  autoscaler options of autoscaler standalone will be as the base 
configuration, and the configuration at job-level can override the default 
value provided in the autoscaler standalone.
   
   ## Verifying this change
   
   - Improved the 
`FlinkClusterJobListFetcherTest#testFetchJobListAndConfigurationInfo` to check 
baseConf is empty and not empty.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35105) Support setting default Autoscaler options at autoscaler standalone level

2024-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35105:
---
Labels: pull-request-available  (was: )

> Support setting default Autoscaler options at autoscaler standalone level
> -
>
> Key: FLINK-35105
> URL: https://issues.apache.org/jira/browse/FLINK-35105
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> Currently, autoscaler standalone doesn't support set [autoscaler 
> options|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/configuration/#autoscaler-configuration].
>  We must set them at job level when we use autoscaler standalone. It's not 
> convenient if platform administrator wanna change the default value for some 
> autoscaler options, such as:
>  * job.autoscaler.enabled
>  * job.autoscaler.metrics.window
>  * etc
> This Jira supports setting Autoscaler options at autoscaler standalone level, 
> it's similar with flink kubernetes operator.
> The  autoscaler options of autoscaler standalone will be as the base 
> configuration, and the configuration at job-level can override the default 
> value provided in the autoscaler standalone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565310323


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;

Review Comment:
   It's user-defined / SQL-defined and unique just like before.
   Even if the sync state and async state co-exist, they should have different 
`stateId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


reswqa commented on PR #24662:
URL: https://github.com/apache/flink/pull/24662#issuecomment-2055922902

   Wait a minute, you shouldn't only change the title of github pull request. 
Commit message should be aligned with this also. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565315084


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKeyedState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKeyedState implements State {

Review Comment:
   Since we may have different machinisms about Window Operator and state with 
namespace, I'd prefer to consider it together with them later. WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24644:
URL: https://github.com/apache/flink/pull/24644#discussion_r1565310108


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java:
##
@@ -349,6 +350,13 @@ private long uploadSnapshotFiles(
 ? CheckpointedStateScope.EXCLUSIVE
 : CheckpointedStateScope.SHARED;
 
+// Report the reuse of state handle to stream factory, which 
is essential for file
+// merging mechanism.
+checkpointStreamFactory.reusePreviousStateHandle(

Review Comment:
   If the following uploading fails, the logic files will be deleted with delay?
   
   And I think the `reusePreviousStateHandle` is a little bit indirect. In 
fact, the `lastUsedCheckpoint` of the newly generated and previous logical 
files are both updated here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35106) Kubernetes Operator ignores checkpoint type configuration

2024-04-15 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-35106:


 Summary: Kubernetes Operator ignores checkpoint type configuration
 Key: FLINK-35106
 URL: https://issues.apache.org/jira/browse/FLINK-35106
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.8.0
Reporter: Mate Czagany


There is a configuration for checkpoint type that will be taken if perioid 
checkpointing is enabled or a manual checkpoint is triggered.

However, the configuration value `kubernetes.operator.checkpoint.type` is 
completely ignored when any checkpoint is triggered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35064) Flink sql connector pulsar/hive com.fasterxml.jackson.annotation.JsonFormat$Value conflict

2024-04-15 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837107#comment-17837107
 ] 

Yufan Sheng commented on FLINK-35064:
-

AFAICS, the {{pulsar-client-all}} should be replaced to {{pulsar-client}} in 
the flink-connector-pulsar. The {{jackson-annotations}} and other common 
dependencies don't get shaded into the client. So we can use the Flink's 
jackson freely. I think this may be the best approach to solve this problem.

BTW, the initial adoption of using {{pulsar-client-all}} is because we need the 
pulsar admin api to accomplish some heavy operations. Since we have drop the 
usage of pulsar admin api. This should also be avoided to use such a heavy 
pulsar client indeed.

WDYT, [~elon]. And [~Tison], can you help me double confirm this?

> Flink sql connector pulsar/hive 
> com.fasterxml.jackson.annotation.JsonFormat$Value conflict
> --
>
> Key: FLINK-35064
> URL: https://issues.apache.org/jira/browse/FLINK-35064
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Connectors / Pulsar
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
>  Labels: pull-request-available
>
> When I compile and package {{flink-sql-connector-pulsar}} & 
> {{{}flink-sql-connector-hive{}}}, and then put these two jar files into the 
> Flink lib directory, I execute the following SQL statement through 
> {{{}bin/sql-client.sh{}}}:
>  
> {code:java}
> // code placeholder
> CREATE TABLE
> pulsar_table (
> content string,
> proc_time AS PROCTIME ()
> )
> WITH
> (
> 'connector' = 'pulsar',
> 'topics' = 'persistent://xxx',
> 'service-url' = 'pulsar://xxx',
> 'source.subscription-name' = 'xxx',
> 'source.start.message-id' = 'latest',
> 'format' = 'csv',
> 'pulsar.client.authPluginClassName' = 
> 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
> 'pulsar.client.authParams' = 'token:xxx'
> );
>  
> select * from pulsar_table; {code}
> The task error exception stack is as follows:
>  
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 
> com.fasterxml.jackson.annotation.JsonFormat$Value.empty()Lcom/fasterxml/jackson/annotation/JsonFormat$Value;
> at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.cfg.MapperConfig.(MapperConfig.java:56)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:660)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:576)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.common.util.ObjectMapperFactory.createObjectMapperInstance(ObjectMapperFactory.java:151)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.common.util.ObjectMapperFactory.(ObjectMapperFactory.java:142)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.create(ConfigurationDataUtils.java:35)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.(ConfigurationDataUtils.java:43)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.ClientBuilderImpl.loadConf(ClientBuilderImpl.java:77)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:105)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:95)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:76)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.PulsarSource.createEnumerator(PulsarSource.java:144)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
> {code}
>  
> The exception shows a conflict with 
> {{{}com.fasterxml.jackson.annotation.JsonFormat$Value{}}}. I investigated and 
> found that {{flink-sql-connector-pulsar}} and {{flink-sql-connector-hive}} 
> depend on different versions, leading to this conflict.
> {code:java}
> // flink-sql-connector-pulsar pom.xml
> 
>     com.fasterxml.jackson
>     jackson-bom
>     pom
>     import
>     2.13.4.20221013

Re: [PR] [FLINK-20625][pubsub,e2e] Add PubSubSource connector using FLIP-27 [flink-connector-gcp-pubsub]

2024-04-15 Thread via GitHub


snuyanzin commented on PR #2:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/2#issuecomment-2056067269

   @dchristle thanks for the review
   May I ask you to have another iteration review for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Update dependencies [flink-connector-gcp-pubsub]

2024-04-15 Thread via GitHub


boring-cyborg[bot] commented on PR #24:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/24#issuecomment-2056080694

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565352484


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;

Review Comment:
   Thanks for the clarification 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35107) rename flink-connector-datagen-test module folder to flink-connector-datagen-tests

2024-04-15 Thread xleoken (Jira)
xleoken created FLINK-35107:
---

 Summary: rename flink-connector-datagen-test module folder to 
flink-connector-datagen-tests
 Key: FLINK-35107
 URL: https://issues.apache.org/jira/browse/FLINK-35107
 Project: Flink
  Issue Type: Improvement
Reporter: xleoken






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565354167


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKeyedState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKeyedState implements State {

Review Comment:
   Make sense, let's consider it later, thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35107] Rename flink-connector-datagen-test module folder to flink-connector-datagen-tests [flink]

2024-04-15 Thread via GitHub


xleoken opened a new pull request, #24665:
URL: https://github.com/apache/flink/pull/24665

   
   
   ## What is the purpose of the change
   
   Rename flink-connector-datagen-test module folder to 
flink-connector-datagen-tests.
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35107) rename flink-connector-datagen-test module folder to flink-connector-datagen-tests

2024-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35107:
---
Labels: pull-request-available  (was: )

> rename flink-connector-datagen-test module folder to 
> flink-connector-datagen-tests
> --
>
> Key: FLINK-35107
> URL: https://issues.apache.org/jira/browse/FLINK-35107
> Project: Flink
>  Issue Type: Improvement
>Reporter: xleoken
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


chenyu-opensource commented on PR #24662:
URL: https://github.com/apache/flink/pull/24662#issuecomment-2056154528

   > Wait a minute, you shouldn't only change the title of github pull request. 
Commit message should be aligned with this also.
   
   Sorry about that. May I close the current PR and request a new one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35107] Rename flink-connector-datagen-test module folder to flink-connector-datagen-tests [flink]

2024-04-15 Thread via GitHub


flinkbot commented on PR #24665:
URL: https://github.com/apache/flink/pull/24665#issuecomment-2056169433

   
   ## CI report:
   
   * fedce03e26e8f925d2fa5297f8fda9ca6c06d151 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35046][state] Introduce AsyncKeyedStateBackend supporting to create StateExecutor [flink]

2024-04-15 Thread via GitHub


masteryhx commented on code in PR #24663:
URL: https://github.com/apache/flink/pull/24663#discussion_r1565377484


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java:
##
@@ -104,6 +104,25 @@ default String getName() {
  CheckpointableKeyedStateBackend createKeyedStateBackend(
 KeyedStateBackendParameters parameters) throws Exception;
 
+/**
+ * Creates a new {@link AsyncKeyedStateBackend} which supports to access 
keyed state
+ * asynchronously.
+ *
+ * Keyed State is state where each value is bound to a key.
+ *
+ * @param parameters The arguments bundle for creating {@link 
AsyncKeyedStateBackend}.
+ * @param  The type of the keys by which the state is organized.
+ * @return The Async Keyed State Backend for the given job, operator.
+ * @throws Exception This method may forward all exceptions that occur 
while instantiating the
+ * backend.
+ */
+@Experimental
+default  AsyncKeyedStateBackend createAsyncKeyedStateBackend(
+KeyedStateBackendParameters parameters) throws Exception {
+throw new UnsupportedOperationException(
+"Don't support createAsyncKeyedStateBackend by default");
+}
+

Review Comment:
   Make sense. I just added `supportsAsyncKeyedStateBackend`.
   The naming is consistent with `supportsNoClaimRestoreMode` and 
`supportsSavepointFormat`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


chenyu-opensource closed pull request #24662: [hotfix][runtime]Fix the missing 
methed parameter annotation problem
URL: https://github.com/apache/flink/pull/24662


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35108) Deployment recovery is triggered on terminal jobs after jm shutdown ttl

2024-04-15 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-35108:
--

 Summary: Deployment recovery is triggered on terminal jobs after 
jm shutdown ttl
 Key: FLINK-35108
 URL: https://issues.apache.org/jira/browse/FLINK-35108
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.8.0, kubernetes-operator-1.7.0
Reporter: Gyula Fora
Assignee: Gyula Fora


The deployment recovery mechanism is incorrectly triggered for terminal jobs 
once the JM deployment is deleted after the TTL period. 

This causes jobs to be resubmitted. This affects only batch jobs.

The workaround is to set 
kubernetes.operator.jm-deployment-recovery.enabled: false

 for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35103) [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic Termination Log Integration

2024-04-15 Thread SwathiChandrashekar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SwathiChandrashekar updated FLINK-35103:

Labels: pull-request-available  (was: )

> [Plugin] Enhancing Flink Failure Management in Kubernetes with Dynamic 
> Termination Log Integration
> --
>
> Key: FLINK-35103
> URL: https://issues.apache.org/jira/browse/FLINK-35103
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: SwathiChandrashekar
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: Status-pod.png, screenshot-1.png
>
>
> Currently, whenever we have flink failures, we need to manually do the 
> triaging by looking into the flink logs even for the initial analysis. It 
> would have been better, if the user/admin directly gets the initial failure 
> information even before looking into the logs.
> To address this, we've developed a comprehensive solution via a plugin aimed 
> at helping fetch the Flink failures, ensuring critical data is preserved for 
> subsequent analysis and action.
>  
> In Kubernetes environments, troubleshooting pod failures can be challenging 
> without checking the pod/flink logs. Fortunately, Kubernetes offers a robust 
> mechanism to enhance debugging capabilities by leveraging the 
> /dev/termination-log file.
> [https://kubernetes.io/docs/tasks/debug/debug-application/determine-reason-pod-failure/]
> By writing failure information to this log, Kubernetes automatically 
> incorporates it into the container status, providing administrators and 
> developers with valuable insights into the root cause of failures.
> Our solution capitalizes on this Kubernetes feature to seamlessly integrate 
> Flink failure reporting within the container ecosystem. Whenever a Flink 
> encounters an issue, our plugin dynamically captures and logs the pertinent 
> failure information into the /dev/termination-log file. This ensures that 
> Kubernetes recognizes and propagates the failure status throughout the 
> container ecosystem, enabling efficient monitoring and response mechanisms.
> By leveraging Kubernetes' native functionality in this manner, our plugin 
> ensures that Flink failure incidents are promptly identified and reflected in 
> the pod status. This technical integration streamlines the debugging process, 
> empowering operators to swiftly diagnose and address issues, thereby 
> minimizing downtime and maximizing system reliability.
>  
> In-order to make this plugin generic, by default it doesn't do any action.  
> We can configure this by using
> *external.log.factory.class : 
> org.apache.flink.externalresource.log.K8SSupportTerminationLog*
> in our flink-conf file.
> This will be present in the plugins directory
> Sample output of the flink pod container status when there is a flink failure.
>  !screenshot-1.png! 
> here, we can see that , the user can clearly understand there was a Auth 
> issue and resolve it instead of checking the complete underlying logs.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


chenyu-opensource opened a new pull request, #24666:
URL: https://github.com/apache/flink/pull/24666

   
   
   ## What is the purpose of the change
   
   This pull request fixs the problem of missing methed parameter annotation.
   
   
   ## Brief change log
   
   Add the parameter annotation of 'taskManagerResourceId' for the method 
'ResourceManagerGateway.sendSlotReport'.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):(no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers:(no)
 - The runtime per-record code paths (performance sensitive):(no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature?(no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


chenyu-opensource commented on PR #24662:
URL: https://github.com/apache/flink/pull/24662#issuecomment-2056223652

   > Wait a minute, you shouldn't only change the title of github pull request. 
Commit message should be aligned with this also.
   
   I had close this pr and create a new one. 
https://github.com/apache/flink/pull/24666


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager [flink]

2024-04-15 Thread via GitHub


ljz2051 commented on code in PR #24644:
URL: https://github.com/apache/flink/pull/24644#discussion_r1562362549


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -70,6 +72,9 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
 @GuardedBy("lock")
 protected TreeMap> uploadedStates = new TreeMap<>();
 
+/** The map that holds all the known live logical files. */
+protected final Map knownLogicalFiles = new 
ConcurrentHashMap<>();

Review Comment:
   It could be a private variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]

2024-04-15 Thread via GitHub


snuyanzin commented on PR #38:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-2056231334

   this is in my todo list for today/tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


reswqa commented on PR #24666:
URL: https://github.com/apache/flink/pull/24666#issuecomment-2056235051

   You could just force push it, github can take care of this. But you've 
opened a new PR, let's move our eyes here. 😉 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


reswqa commented on PR #24666:
URL: https://github.com/apache/flink/pull/24666#issuecomment-2056237176

   I will merge this after CI passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


flinkbot commented on PR #24666:
URL: https://github.com/apache/flink/pull/24666#issuecomment-2056241215

   
   ## CI report:
   
   * b0404b8f421f5920d958ff4ad3fdad170d4f56e4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Boyko updated FLINK-34694:

Attachment: image-2024-04-15-15-45-51-027.png

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34694) Delete num of associations for streaming outer join

2024-04-15 Thread Roman Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Boyko updated FLINK-34694:

Attachment: image-2024-04-15-15-46-17-671.png

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

2024-04-15 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-35022:
-

Assignee: Ahmed Hamdy

> Add TypeInformed Element Converter for DynamoDbSink
> ---
>
> Key: FLINK-35022
> URL: https://issues.apache.org/jira/browse/FLINK-35022
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-4.3.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> h2. Context
> {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
> {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
> Flink stream objects to DynamoDb write requests, where item is represented as 
> {{Map}}.
> {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
> format similar with type identification properties as in
> {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.
> Since TypeInformation is already natively supported in Flink, many 
> implementations of the DynamoDb ElementConverted is just a boiler plate. 
> For example 
> {code:title="Simple POJO Element Conversion"}
>  public class Order {
> String id;
> int quantity;
> double total;
> }
> {code}
> The implementation of the converter must be 
> {code:title="Simple POJO DDB Element Converter"}
> public static class SimplePojoElementConverter implements 
> ElementConverter {
> @Override
> public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
> context) {
> Map itemMap = new HashMap<>();
> itemMap.put("id", AttributeValue.builder().s(order.id).build());
> itemMap.put("quantity", 
> AttributeValue.builder().n(String.valueOf(order.quantity)).build());
> itemMap.put("total", 
> AttributeValue.builder().n(String.valueOf(order.total)).build());
> return DynamoDbWriteRequest.builder()
> .setType(DynamoDbWriteRequestType.PUT)
> .setItem(itemMap)
> .build();
> }
> @Override
> public void open(Sink.InitContext context) {
> 
> }
> }
> {code}
> while this might not be too much of work, however it is a fairly common case 
> in Flink and this implementation requires some fair knowledge of DDB model 
> for new users.
> h2. Proposal 
> Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:
> {code:title="TypeInformedElementconverter"} 
> public class DynamoDbTypeInformedElementConverter implements 
> ElementConverter {
> DynamoDbTypeInformedElementConverter(CompositeType typeInfo);
> public DynamoDbWriteRequest convertElement(input) {
> switch this.typeInfo{
> case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
> AttributeValue.fromS(o.toString())
> case: BasicTypeInfo.SHORT_TYPE_INFO: 
> case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> 
> AttributeValue.fromN(o.toString())
>case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
>   .
> }
> }
> }
> // User Code
> public static void main(String []args) {
>   DynamoDbTypeInformedElementConverter elementConverter = new 
> DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
> DdbSink.setElementConverter(elementConverter); 
> }
> {code}
> We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which 
> should be enough to cover all DDB supported types 
> (s,n,bool,b,ss,ns,bs,bools,m,l)
> 1- 
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


chenyu-opensource commented on PR #24666:
URL: https://github.com/apache/flink/pull/24666#issuecomment-2056282707

   > I will merge this after CI passed.
   
   This is my first time contributing to this project. Thank you so much for 
your patience. I will continue to pay attention to the community and actively 
contribute.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][runtime]Fix the missing methed parameter annotation problem [flink]

2024-04-15 Thread via GitHub


reswqa commented on PR #24666:
URL: https://github.com/apache/flink/pull/24666#issuecomment-2056324276

   welcome to on board 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

2024-04-15 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837132#comment-17837132
 ] 

Danny Cranmer commented on FLINK-35022:
---

Hey [~chalixar], thanks for the contribution. Can you elaborate on when a user 
would use this instead of the 
[DynamoDbBeanElementConverter|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java]?
 You can simply annotate your class with `@DynamoDbBean` and then use the 
converter like so:

{code}
ElementConverter elementConverter =
new DynamoDbBeanElementConverter<>(Order.class);
{code}

Are you targeting the case when the user does not want/cannot change the model? 
I would rather leave the type transform to the AWS SDK if possible. However, I 
like the idea here. Please elaborate on the motivation

> Add TypeInformed Element Converter for DynamoDbSink
> ---
>
> Key: FLINK-35022
> URL: https://issues.apache.org/jira/browse/FLINK-35022
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-4.3.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> h2. Context
> {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
> {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
> Flink stream objects to DynamoDb write requests, where item is represented as 
> {{Map}}.
> {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
> format similar with type identification properties as in
> {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.
> Since TypeInformation is already natively supported in Flink, many 
> implementations of the DynamoDb ElementConverted is just a boiler plate. 
> For example 
> {code:title="Simple POJO Element Conversion"}
>  public class Order {
> String id;
> int quantity;
> double total;
> }
> {code}
> The implementation of the converter must be 
> {code:title="Simple POJO DDB Element Converter"}
> public static class SimplePojoElementConverter implements 
> ElementConverter {
> @Override
> public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
> context) {
> Map itemMap = new HashMap<>();
> itemMap.put("id", AttributeValue.builder().s(order.id).build());
> itemMap.put("quantity", 
> AttributeValue.builder().n(String.valueOf(order.quantity)).build());
> itemMap.put("total", 
> AttributeValue.builder().n(String.valueOf(order.total)).build());
> return DynamoDbWriteRequest.builder()
> .setType(DynamoDbWriteRequestType.PUT)
> .setItem(itemMap)
> .build();
> }
> @Override
> public void open(Sink.InitContext context) {
> 
> }
> }
> {code}
> while this might not be too much of work, however it is a fairly common case 
> in Flink and this implementation requires some fair knowledge of DDB model 
> for new users.
> h2. Proposal 
> Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:
> {code:title="TypeInformedElementconverter"} 
> public class DynamoDbTypeInformedElementConverter implements 
> ElementConverter {
> DynamoDbTypeInformedElementConverter(CompositeType typeInfo);
> public DynamoDbWriteRequest convertElement(input) {
> switch this.typeInfo{
> case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
> AttributeValue.fromS(o.toString())
> case: BasicTypeInfo.SHORT_TYPE_INFO: 
> case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> 
> AttributeValue.fromN(o.toString())
>case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
>   .
> }
> }
> }
> // User Code
> public static void main(String []args) {
>   DynamoDbTypeInformedElementConverter elementConverter = new 
> DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
> DdbSink.setElementConverter(elementConverter); 
> }
> {code}
> We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which 
> should be enough to cover all DDB supported types 
> (s,n,bool,b,ss,ns,bs,bools,m,l)
> 1- 
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

2024-04-15 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837132#comment-17837132
 ] 

Danny Cranmer edited comment on FLINK-35022 at 4/15/24 9:19 AM:


Hey [~chalixar], thanks for the contribution. Can you elaborate on when a user 
would use this instead of the 
[DynamoDbBeanElementConverter|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java]?
 You can simply annotate your class with {{@DynamoDbBean}} and then use the 
converter like so (example 
[Order|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java]):

{code}
ElementConverter elementConverter =
new DynamoDbBeanElementConverter<>(Order.class);
{code}

Are you targeting the case when the user does not want/cannot change the model? 
I would rather leave the type transform to the AWS SDK if possible. However, I 
like the idea here. Please elaborate on the motivation


was (Author: dannycranmer):
Hey [~chalixar], thanks for the contribution. Can you elaborate on when a user 
would use this instead of the 
[DynamoDbBeanElementConverter|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java]?
 You can simply annotate your class with `@DynamoDbBean` and then use the 
converter like so:

{code}
ElementConverter elementConverter =
new DynamoDbBeanElementConverter<>(Order.class);
{code}

Are you targeting the case when the user does not want/cannot change the model? 
I would rather leave the type transform to the AWS SDK if possible. However, I 
like the idea here. Please elaborate on the motivation

> Add TypeInformed Element Converter for DynamoDbSink
> ---
>
> Key: FLINK-35022
> URL: https://issues.apache.org/jira/browse/FLINK-35022
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-4.3.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> h2. Context
> {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
> {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
> Flink stream objects to DynamoDb write requests, where item is represented as 
> {{Map}}.
> {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
> format similar with type identification properties as in
> {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.
> Since TypeInformation is already natively supported in Flink, many 
> implementations of the DynamoDb ElementConverted is just a boiler plate. 
> For example 
> {code:title="Simple POJO Element Conversion"}
>  public class Order {
> String id;
> int quantity;
> double total;
> }
> {code}
> The implementation of the converter must be 
> {code:title="Simple POJO DDB Element Converter"}
> public static class SimplePojoElementConverter implements 
> ElementConverter {
> @Override
> public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
> context) {
> Map itemMap = new HashMap<>();
> itemMap.put("id", AttributeValue.builder().s(order.id).build());
> itemMap.put("quantity", 
> AttributeValue.builder().n(String.valueOf(order.quantity)).build());
> itemMap.put("total", 
> AttributeValue.builder().n(String.valueOf(order.total)).build());
> return DynamoDbWriteRequest.builder()
> .setType(DynamoDbWriteRequestType.PUT)
> .setItem(itemMap)
> .build();
> }
> @Override
> public void open(Sink.InitContext context) {
> 
> }
> }
> {code}
> while this might not be too much of work, however it is a fairly common case 
> in Flink and this implementation requires some fair knowledge of DDB model 
> for new users.
> h2. Proposal 
> Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:
> {code:title="TypeInformedElementconverter"} 
> public class DynamoDbTypeInformedElementConverter implements 
> ElementConverter {
> DynamoDbTypeInformedElementConverter(CompositeType typeInfo);
> public DynamoDbWriteRequest convertElement(input) {
> switch this.typeInfo{
> case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
> AttributeValue.fromS(o.toString())
> case: BasicTypeInfo.SHORT_TYPE_INFO: 
> case: BasicTypeInfo.INTEGER_TYPE

[jira] [Updated] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-04-15 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-35109:
---
Summary: Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and 
drop support for 1.17 and 1.18  (was: Drop support for Flink 1.17 and 1.18 in 
Flink Kafka connector)

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35022] Add TypeInformed DDB Element Converter [flink-connector-aws]

2024-04-15 Thread via GitHub


dannycranmer commented on PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#issuecomment-2056352344

   Please change commit message to include the component: 
`[FLINK-35022][Connectors/DynamoDB]`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35109) Drop support for Flink 1.17 and 1.18 in Flink Kafka connector

2024-04-15 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-35109:
--

 Summary: Drop support for Flink 1.17 and 1.18 in Flink Kafka 
connector
 Key: FLINK-35109
 URL: https://issues.apache.org/jira/browse/FLINK-35109
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Reporter: Martijn Visser
 Fix For: kafka-4.0.0


The Flink Kafka connector currently can't compile against Flink 1.20-SNAPSHOT. 
An example failure can be found at 
https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169

The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, see 
FLINK-32455. See also specifically the comment in 
https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785

Next to that, there's also FLINK-25509 which can only be supported with Flink 
1.19 and higher. 

So we should:
* Drop support for 1.17 and 1.18
* Refactor the Flink Kafka connector to use the new 
{code:java}MigrationTest{code}

We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
this change will be a new v4.0 version with support for Flink 1.19 and the 
upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34903][MySQL][Feature] Add mysql-pipeline-connector with tables.exclude option to exclude unnecessary tables [flink-cdc]

2024-04-15 Thread via GitHub


PatrickRen commented on code in PR #3186:
URL: https://github.com/apache/flink-cdc/pull/3186#discussion_r1565458476


##
docs/content.zh/docs/connectors/mysql.md:
##
@@ -107,6 +107,14 @@ pipeline:
   需要注意的是,点号(.)被视为数据库和表名的分隔符。 
如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
   例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
 
+
+  tables.exclude
+  optional
+  (none)
+  String
+  需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。

Review Comment:
   nit:
   ```suggestion
 需要排除的 MySQL 
数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。
   ```



##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##
@@ -231,4 +231,16 @@ public class MySqlDataSourceOptions {
 .defaultValue(true)
 .withDescription(
 "Whether send schema change events, by default is 
true. If set to false, the schema changes will not be sent.");
+
+@Experimental
+public static final ConfigOption TABLE_EXCLUDE_LIST =

Review Comment:
   ```suggestion
   public static final ConfigOption TABLES_EXCLUDE =
   ```



##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java:
##
@@ -79,6 +80,27 @@ public void testNoMatchedTable() {
 .hasMessageContaining("Cannot find any table by the option 
'tables' = " + tables);
 }
 
+@Test

Review Comment:
   We still need a test case for validating if excluding part of tables (not 
all) works as expected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32732][Connectors/Kafka] auto offset reset should be exposed t… [flink-connector-kafka]

2024-04-15 Thread via GitHub


MartijnVisser closed pull request #43: [FLINK-32732][Connectors/Kafka] auto 
offset reset should be exposed t…
URL: https://github.com/apache/flink-connector-kafka/pull/43


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32732][Connectors/Kafka] auto offset reset should be exposed t… [flink-connector-kafka]

2024-04-15 Thread via GitHub


MartijnVisser commented on PR #43:
URL: 
https://github.com/apache/flink-connector-kafka/pull/43#issuecomment-2056370919

   No activity on the PR, closing as invalid


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32732) auto offset reset should be exposed to user

2024-04-15 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-32732.
--
Resolution: Invalid

> auto offset reset should be exposed to user
> ---
>
> Key: FLINK-32732
> URL: https://issues.apache.org/jira/browse/FLINK-32732
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> {code:java}
> // code placeholder
> maybeOverride(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> 
> startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
> true); {code}
> now flink override the auto.offset.reset with the scan.startup.mode config, 
> and user's explicit config does not take effect. I think maybe we should 
> expose this to customer?
>  
> I think after consuming kafka records from earliest to latest, the 
> scan.startup.mode should no longer influence the kafka scan behave. So I 
> suggest change the override to false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-15 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-34470:
---
Affects Version/s: kafka-3.1.0
   (was: 1.17.1)

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35022] Add TypeInformed DDB Element Converter [flink-connector-aws]

2024-04-15 Thread via GitHub


dannycranmer commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1565419487


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+private final CompositeType typeInfo;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType 
typeInfo) {
+this.typeInfo = typeInfo;
+}
+
+@Override
+public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(convertElementUsingTypeInfo(input, typeInfo))
+.build();
+} catch (IllegalArgumentException e) {
+throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+}
+}
+
+private  Map convertElementUsingTypeInfo(
+attT t, CompositeType typeInfo) {
+Map map = new HashMap<>();
+for (String fieldKey : typeInfo.getFieldNames()) {
+TypeInformation fieldType = typeInfo.getTypeAt(fieldKey);
+try {
+Field field = t.getClass().getDeclaredField(fieldKey);
+field.setAccessible(true);
+Object fieldVal = field.get(t);
+checkTypeCompatibility(fieldVal, fieldType);
+attT fieldValCaster = (attT) fieldVal;
+map.put(fieldKey, convertValue(fieldValCaster, fieldType));
+} catch (NoSuchFieldException | IllegalAccessException e) {
+throw new IllegalArgumentException(
+String.format(
+"Failed to extract field %s declared in 
TypeInfo "
++ "from Object %s",
+fieldKey, t),
+e);
+}
+}
+
+return map;
+}
+
+private  AttributeValue convertValue(
+attrT attribute, TypeInformation objectTypeInformation) {
+if (attribute == null) {
+return AttributeValue.builder().nul(true).build();

Review Comment:
   Not sure this is the correct thing to do here? I 

Re: [PR] [FLINK-35022] Add TypeInformed DDB Element Converter [flink-connector-aws]

2024-04-15 Thread via GitHub


dannycranmer commented on PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#issuecomment-2056383239

   I have left a comment regarding the need for this on the Jira: 
https://issues.apache.org/jira/browse/FLINK-35022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-15 Thread via GitHub


fredia opened a new pull request, #24667:
URL: https://github.com/apache/flink/pull/24667

   
   
   ## What is the purpose of the change
   
   As part of the async execution model of disaggregated state management, this 
PR introduce async execution configurations.
   
   
   ## Brief change log
   
   - Add async execution configurations in `ExecutionOptions`
   - Add related getter/setter in `ExecutionConfig`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - StreamExecutionEnvironmentTest#testAsyncExecutionConfiguration
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs/ JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35026) Introduce async execution configurations

2024-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35026:
---
Labels: pull-request-available  (was: )

> Introduce async execution configurations
> 
>
> Key: FLINK-35026
> URL: https://issues.apache.org/jira/browse/FLINK-35026
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration, Runtime / Task
>Reporter: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35075][table] Migrate TwoStageOptimizedAggregateRule to java [flink]

2024-04-15 Thread via GitHub


liuyongvs commented on PR #24650:
URL: https://github.com/apache/flink/pull/24650#issuecomment-2056391409

   hi @snuyanzin will you help review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-30388) Add support for ElementConverted open() method for KDS/KDF/DDB

2024-04-15 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-30388:
-

Assignee: Ahmed Hamdy

> Add support for ElementConverted open() method for KDS/KDF/DDB
> --
>
> Key: FLINK-30388
> URL: https://issues.apache.org/jira/browse/FLINK-30388
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB, Connectors / Firehose, Connectors 
> / Kinesis
>Reporter: Danny Cranmer
>Assignee: Ahmed Hamdy
>Priority: Major
>
> FLINK-29938 added support for an {{open()}} method in Async Sink 
> ElementConverter. Once flink-connector-aws upgrades to Flink 1.17 we should 
> implement this method. It was originally implemented 
> [here|https://github.com/apache/flink/pull/21265] but was yanked during the 
> [sync|FLINK-30384]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

2024-04-15 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837140#comment-17837140
 ] 

Ahmed Hamdy commented on FLINK-35022:
-

Hi [~dannycranmer] 
Thanks for the reply, I agree most POJOs could just be converted using bean 
converter. 
To be very honest, the main motivation came from Pyflink where type informed 
data types are commonly used, 
The BeanConverter couldn't be used in pyflink after supporting DDB Pyflink 
https://issues.apache.org/jira/browse/FLINK-32007 (Actually this is a blocker 
to this task) since defined pojos are python classes not java, this is resolved 
in datastream functions using TypeInformed functions as in [map function 
here.|https://github.com/apache/flink/blob/f74dc57561a058696bd2bd42593f862a9b490474/flink-python/pyflink/datastream/data_stream.py#L273]

> Add TypeInformed Element Converter for DynamoDbSink
> ---
>
> Key: FLINK-35022
> URL: https://issues.apache.org/jira/browse/FLINK-35022
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-4.3.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> h2. Context
> {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
> {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
> Flink stream objects to DynamoDb write requests, where item is represented as 
> {{Map}}.
> {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
> format similar with type identification properties as in
> {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.
> Since TypeInformation is already natively supported in Flink, many 
> implementations of the DynamoDb ElementConverted is just a boiler plate. 
> For example 
> {code:title="Simple POJO Element Conversion"}
>  public class Order {
> String id;
> int quantity;
> double total;
> }
> {code}
> The implementation of the converter must be 
> {code:title="Simple POJO DDB Element Converter"}
> public static class SimplePojoElementConverter implements 
> ElementConverter {
> @Override
> public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
> context) {
> Map itemMap = new HashMap<>();
> itemMap.put("id", AttributeValue.builder().s(order.id).build());
> itemMap.put("quantity", 
> AttributeValue.builder().n(String.valueOf(order.quantity)).build());
> itemMap.put("total", 
> AttributeValue.builder().n(String.valueOf(order.total)).build());
> return DynamoDbWriteRequest.builder()
> .setType(DynamoDbWriteRequestType.PUT)
> .setItem(itemMap)
> .build();
> }
> @Override
> public void open(Sink.InitContext context) {
> 
> }
> }
> {code}
> while this might not be too much of work, however it is a fairly common case 
> in Flink and this implementation requires some fair knowledge of DDB model 
> for new users.
> h2. Proposal 
> Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:
> {code:title="TypeInformedElementconverter"} 
> public class DynamoDbTypeInformedElementConverter implements 
> ElementConverter {
> DynamoDbTypeInformedElementConverter(CompositeType typeInfo);
> public DynamoDbWriteRequest convertElement(input) {
> switch this.typeInfo{
> case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
> AttributeValue.fromS(o.toString())
> case: BasicTypeInfo.SHORT_TYPE_INFO: 
> case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> 
> AttributeValue.fromN(o.toString())
>case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
>   .
> }
> }
> }
> // User Code
> public static void main(String []args) {
>   DynamoDbTypeInformedElementConverter elementConverter = new 
> DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
> DdbSink.setElementConverter(elementConverter); 
> }
> {code}
> We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which 
> should be enough to cover all DDB supported types 
> (s,n,bool,b,ss,ns,bs,bools,m,l)
> 1- 
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30388) Add support for ElementConverted open() method for KDS/KDF/DDB

2024-04-15 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-30388:
--
Fix Version/s: aws-connector-4.3.0

> Add support for ElementConverted open() method for KDS/KDF/DDB
> --
>
> Key: FLINK-30388
> URL: https://issues.apache.org/jira/browse/FLINK-30388
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB, Connectors / Firehose, Connectors 
> / Kinesis
>Reporter: Danny Cranmer
>Assignee: Ahmed Hamdy
>Priority: Major
> Fix For: aws-connector-4.3.0
>
>
> FLINK-29938 added support for an {{open()}} method in Async Sink 
> ElementConverter. Once flink-connector-aws upgrades to Flink 1.17 we should 
> implement this method. It was originally implemented 
> [here|https://github.com/apache/flink/pull/21265] but was yanked during the 
> [sync|FLINK-30384]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35062][table] Migrate RewriteMultiJoinConditionRule to java [flink]

2024-04-15 Thread via GitHub


liuyongvs commented on PR #24648:
URL: https://github.com/apache/flink/pull/24648#issuecomment-2056392924

   hi @snuyanzin will you help review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-15 Thread via GitHub


flinkbot commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2056402383

   
   ## CI report:
   
   * aea56668e33e7062e6b18dac4086a2f05dc36fc1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30388) Add support for ElementConverted open() method for KDS/KDF/DDB

2024-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30388:
---
Labels: pull-request-available  (was: )

> Add support for ElementConverted open() method for KDS/KDF/DDB
> --
>
> Key: FLINK-30388
> URL: https://issues.apache.org/jira/browse/FLINK-30388
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB, Connectors / Firehose, Connectors 
> / Kinesis
>Reporter: Danny Cranmer
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> FLINK-29938 added support for an {{open()}} method in Async Sink 
> ElementConverter. Once flink-connector-aws upgrades to Flink 1.17 we should 
> implement this method. It was originally implemented 
> [here|https://github.com/apache/flink/pull/21265] but was yanked during the 
> [sync|FLINK-30384]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35075) Migrate TwoStageOptimizedAggregateRule

2024-04-15 Thread Jacky Lau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837146#comment-17837146
 ] 

Jacky Lau commented on FLINK-35075:
---

hi [~snuyanzin] will you help review this?

> Migrate TwoStageOptimizedAggregateRule
> --
>
> Key: FLINK-35075
> URL: https://issues.apache.org/jira/browse/FLINK-35075
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-04-15 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837148#comment-17837148
 ] 

Yufan Sheng commented on FLINK-34436:
-

If you want to consume all the messages with different types from the same 
topic, the GenericRecordDeserializationSchema may be the best approach to use.

https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchema.java

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34091) Flink pulsar connect add automatic failover capability

2024-04-15 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837152#comment-17837152
 ] 

Yufan Sheng commented on FLINK-34091:
-

Since this is a requirement from Pulsar side. I don't think connector should 
care about such huge requirements. The connector only wraps the pulsar client 
and expose all its available configuration options.

BTW, pr is welcome for adding such big feature. The pulsar sink has support the 
geo replication. 
https://github.com/apache/flink-connector-pulsar/blob/9f4b902c2a478d0105eec1e32bac3ea40f318d00/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/message/PulsarMessageBuilder.java#L116
 Maybe you can check it and add the support in the source side.

> Flink pulsar connect add automatic failover capability
> --
>
> Key: FLINK-34091
> URL: https://issues.apache.org/jira/browse/FLINK-34091
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Pulsar
>Affects Versions: 1.18.0
>Reporter: yangyijun
>Priority: Major
>
> For some business scenarios with high SLA support capabilities, we need to 
> support automated cross cluster disaster recovery capabilities. However, the 
> current flick pulsar connect does not have this disaster tolerance capability.
> We hope to support the following scenarios:
> 1. master slave hot standby;
> 2. double live (It can support double write or a cluster sharing half of the 
> traffic.);
> 3.Multi activity disaster recovery;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35022] Add TypeInformed DDB Element Converter [flink-connector-aws]

2024-04-15 Thread via GitHub


vahmed-hamdy commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1565513965


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+private final CompositeType typeInfo;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType 
typeInfo) {
+this.typeInfo = typeInfo;
+}
+
+@Override
+public DynamoDbWriteRequest apply(inputT input, SinkWriter.Context 
context) {
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(convertElementUsingTypeInfo(input, typeInfo))
+.build();
+} catch (IllegalArgumentException e) {
+throw new FlinkRuntimeException("Couldn't convert Element to 
AttributeVal", e);
+}
+}
+
+private  Map convertElementUsingTypeInfo(
+attT t, CompositeType typeInfo) {
+Map map = new HashMap<>();
+for (String fieldKey : typeInfo.getFieldNames()) {
+TypeInformation fieldType = typeInfo.getTypeAt(fieldKey);
+try {
+Field field = t.getClass().getDeclaredField(fieldKey);
+field.setAccessible(true);
+Object fieldVal = field.get(t);
+checkTypeCompatibility(fieldVal, fieldType);
+attT fieldValCaster = (attT) fieldVal;
+map.put(fieldKey, convertValue(fieldValCaster, fieldType));
+} catch (NoSuchFieldException | IllegalAccessException e) {
+throw new IllegalArgumentException(
+String.format(
+"Failed to extract field %s declared in 
TypeInfo "
++ "from Object %s",
+fieldKey, t),
+e);
+}
+}
+
+return map;
+}
+
+private  AttributeValue convertValue(
+attrT attribute, TypeInformation objectTypeInformation) {
+if (attribute == null) {
+return AttributeValue.builder().nul(true).build();

Review Comment:
   @dannycranmer , yeah just checked 
`testConvertOr

[jira] [Commented] (FLINK-33729) Events are getting lost when an exception occurs within a processing function

2024-04-15 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837157#comment-17837157
 ] 

Yufan Sheng commented on FLINK-33729:
-

[~Weijie Guo] Yep, this is a valid ticket. But it's not from the Flink side. We 
need to figure out why the transaction didn't works on Pulsar. So I think this 
ticket should be submitted to Pulsar community.

[~rtrojczak] I can't find any checkpoint configuration from your sample code. I 
can only see two line of codes that enable checkpoint. I think this is the main 
reason that your code fails as expect. Flink checkpoint needs a lot of 
configuration to use. Such as the storage. You can check the link below to get 
your application proper configured.

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing

> Events are getting lost when an exception occurs within a processing function
> -
>
> Key: FLINK-33729
> URL: https://issues.apache.org/jira/browse/FLINK-33729
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.3
>Reporter: Rafał Trójczak
>Priority: Major
>
> We have a Flink job using a Pulsar source that reads from an input topic, and 
> a Pulsar sink that is writing to an output topic.  Both Flink and Pulsar 
> connector are of version 1.15.3. The Pulsar version that I use is 2.10.3.
> Here is a simple project that is intended to reproduce this problem: 
> [https://github.com/trojczak/flink-pulsar-connector-problem/]
> All of my tests were done on my local Kubernetes cluster using the Flink 
> Kubernetes Operator and Pulsar is running on  my local Docker. But the same 
> problem occurred on a "normal" cluster.
> Expected behavior: When an exception is thrown within the code (or a 
> TaskManager pod is restarted for any other reason, e.g. OOM exception), the 
> processing should be picked up from the last event sent to the output topic.
> Actual behavior: The events before the failure are sent correctly to the 
> output topic, next some of the events from the input topic are missing, then 
> from some point the events are being processed normally until the next 
> exception is thrown, and so on. Finally, from 100 events that should be sent 
> from the input topic to the output topic, only 40 are sent.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33486) Pulsar Client Send Timeout Terminates TaskManager

2024-04-15 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837160#comment-17837160
 ] 

Yufan Sheng commented on FLINK-33486:
-

The retry logic is internally provided by the Pulsar client. Should we 
implement this feature in Flink side? I don't thinks so. But [~dchristle] 
sounds reasonable to me, that is add a retry logic in `at-least-once` mode and 
let it crash in `exactly-once` mode.

> Pulsar Client Send Timeout Terminates TaskManager
> -
>
> Key: FLINK-33486
> URL: https://issues.apache.org/jira/browse/FLINK-33486
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> Currently, when the Pulsar Producer encounters a timeout when attempting to 
> send data, it generates an unhandled TimeoutException. This is not a 
> reasonable way to handle the timeout. The situation should be handled in a 
> graceful way either through additional parameters that put control of the 
> action under the discretion of the user or through some callback mechanism 
> that the user can work with to write code. Unfortunately, fight now, this 
> causes a termination of the task manager which then leads to other issues.
> Increasing the timeout period to avoid the issue is not really an option to 
> ensure proper handling in the event that the situation does occur.
> The exception is as follows:
> org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: 
> persistent://public/default/myproducer-partition-0
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
>  ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
>         at 
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
>  ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-dist-1.17.1.jar:1.17.1]
>         at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: 
> org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The 
> producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send 
> message to the topic persistent://public/default/myproducer-partition-0 
> within given timeout
>         at 
> org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) 
> ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
>  ~[pulsar-client-all-2.11.2.jar:2.11.2]
>         at 
> org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run

[jira] [Created] (FLINK-35110) Modify the spelling mistakes in the taskmanager html

2024-04-15 Thread JJJJude (Jira)
ude created FLINK-35110:
---

 Summary: Modify the spelling mistakes in the taskmanager html
 Key: FLINK-35110
 URL: https://issues.apache.org/jira/browse/FLINK-35110
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.19.0
Reporter: ude
 Fix For: 1.19.0


Fix the spelling error from "profiler"  to "profiling"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35111) Modify the spelling mistakes in the taskmanager html

2024-04-15 Thread JJJJude (Jira)
ude created FLINK-35111:
---

 Summary: Modify the spelling mistakes in the taskmanager html
 Key: FLINK-35111
 URL: https://issues.apache.org/jira/browse/FLINK-35111
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.19.0
Reporter: ude
 Fix For: 1.19.0


Fix the spelling error from "profiler"  to "profiling"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565516972


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessing {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;
+
+/** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+// TODO: properly read config and setup
+final MailboxExecutor mailboxExecutor =
+containingTask.getEnvironment().getMainMailboxExecutor();
+this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+}
+
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+// TODO: Read from config
+return true;
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public final  void setAsyncKeyedContextElement(
+StreamRecord record, KeySelector keySelector) throws 
Exception {
+lastProcessContext =
+asyncExecutionController.buildContext(
+record.getValue(), 
keySelector.getKey(record.getValue()));
+// The processElement will be treated as a callback for dummy request. 
So reference
+// counting should be maintained.
+// When state request submitted, ref count +1, as described in 
FLIP-425:
+// To cover the statements without a callback, in addition to the 
reference count marked
+// in Fig.5, each state request itself is also protected by a paired 
reference count.
+lastProcessContext.retain();
+asyncExecutionController.setCurrentContext(lastProcessContext);
+}
+
+@Override
+public final void postProcessElement() {
+// The processElement will be treated as a callback for dummy request. 
So reference
+// counting should be maintained.
+// When a state request completes, ref count -1, as described in 
FLIP-425:
+// To cover the statements without a callback, in addition to the 
reference count marked
+// in Fig.5, each state request itself is also protected by a paired 
reference count.
+lastProcessContext.release();
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public final  ThrowingConsumer, Exception> 
getRecordProcessor(int inputId) {
+// Ideally, only TwoStreamInputOperator/OneInputStreamOperator(Input) 
will invoke here.
+// Only 

[jira] [Commented] (FLINK-33136) Flink Pulsar Connector RoundRobinTopicRouter Generates Invalid Error Message

2024-04-15 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837161#comment-17837161
 ] 

Yufan Sheng commented on FLINK-33136:
-

Yep, I think you are right. We should use more perspective words to describe 
the issue that no topics is present. BTW, add the check in the {{PulsarSink}}. 
PR is welcome.

> Flink Pulsar Connector RoundRobinTopicRouter Generates Invalid Error Message
> 
>
> Key: FLINK-33136
> URL: https://issues.apache.org/jira/browse/FLINK-33136
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.1
>Reporter: Jason Kania
>Priority: Major
>
> The RoundRobinTopicRouter class generates the runtime error "You should 
> provide topics for routing topic by message key hash." when no partitions are 
> set. This error is a direct copy of the error in the KeyHashTopicRouter but 
> is nonsensical to a RoundRobinTopicRouter since hashing is not involved in 
> route selection.
> More importantly however, this error should be detected at deploy time when 
> the PulsarSink is built with the builder since the list of topics is supplied 
> via the setTopics() method of the builder.
> Additionally, the wording of the error is not clear in any case and could be 
> improved to something like: "No partition routing topics were provided to 
> allow for topic routing"
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35112) Membership for Row class does not include field names

2024-04-15 Thread Wouter Zorgdrager (Jira)
Wouter Zorgdrager created FLINK-35112:
-

 Summary: Membership for Row class does not include field names
 Key: FLINK-35112
 URL: https://issues.apache.org/jira/browse/FLINK-35112
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.18.1
Reporter: Wouter Zorgdrager


In the Row class in PyFlink I cannot do a membership check for field names. 
This minimal example will show the unexpected behavior:

```

from pyflink.common import Row

row = Row(name="Alice", age=11)
# Expected to be True, but is False
print("name" in row)

person = Row("name", "age")
# This is True, as expected
print('name' in person)

```

The related code in the Row class is:
```
    def __contains__(self, item):
        return item in self._values
```


It should be relatively easy to fix with the following code:
```
    def __contains__(self, item):
        if hasattr(self, "_fields"):
            return item in self._fields
        else:
            return item in self._values
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34961] Use dedicated CI name for Opensearch connector to differentiate it in infra-reports [flink-connector-opensearch]

2024-04-15 Thread via GitHub


snuyanzin merged PR #43:
URL: https://github.com/apache/flink-connector-opensearch/pull/43


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32645) Flink pulsar sink is having poor performance

2024-04-15 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837162#comment-17837162
 ] 

Yufan Sheng commented on FLINK-32645:
-

[~tison] I think we can close this issue now.

> Flink pulsar sink is having poor performance
> 
>
> Key: FLINK-32645
> URL: https://issues.apache.org/jira/browse/FLINK-32645
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.2
> Environment: !Screenshot 2023-07-22 at 1.59.42 PM.png!!Screenshot 
> 2023-07-22 at 2.03.53 PM.png!
>  
>Reporter: Vijaya Bhaskar V
>Assignee: Zili Chen
>Priority: Major
> Fix For: pulsar-3.0.2
>
> Attachments: Screenshot 2023-07-22 at 2.03.53 PM.png, Screenshot 
> 2023-07-22 at 2.56.55 PM.png, Screenshot 2023-07-22 at 3.45.21 PM-1.png, 
> Screenshot 2023-07-22 at 3.45.21 PM.png, pom.xml
>
>
> Found following issue with flink pulsar sink:
>  
> Flink pulsar sink is always waiting while enqueueing the message and making 
> the task slot busy no matter how many free slots we provide. Attached the 
> screen shot of the same
> Just sending messages of less rate 8k msg/sec and stand alone flink job with 
> discarding sink is able to receive full rate if 8K msg/sec
> Where as pulsar sink was consuming only upto 2K msg/sec and the sink is 
> always busy waiting. Snapshot of thread dump attached.
> Also snap shot of flink stream graph attached
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32645) Flink pulsar sink is having poor performance

2024-04-15 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen resolved FLINK-32645.
---
Resolution: Fixed

> Flink pulsar sink is having poor performance
> 
>
> Key: FLINK-32645
> URL: https://issues.apache.org/jira/browse/FLINK-32645
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.2
> Environment: !Screenshot 2023-07-22 at 1.59.42 PM.png!!Screenshot 
> 2023-07-22 at 2.03.53 PM.png!
>  
>Reporter: Vijaya Bhaskar V
>Assignee: Zili Chen
>Priority: Major
> Fix For: pulsar-3.0.2
>
> Attachments: Screenshot 2023-07-22 at 2.03.53 PM.png, Screenshot 
> 2023-07-22 at 2.56.55 PM.png, Screenshot 2023-07-22 at 3.45.21 PM-1.png, 
> Screenshot 2023-07-22 at 3.45.21 PM.png, pom.xml
>
>
> Found following issue with flink pulsar sink:
>  
> Flink pulsar sink is always waiting while enqueueing the message and making 
> the task slot busy no matter how many free slots we provide. Attached the 
> screen shot of the same
> Just sending messages of less rate 8k msg/sec and stand alone flink job with 
> discarding sink is able to receive full rate if 8K msg/sec
> Where as pulsar sink was consuming only upto 2K msg/sec and the sink is 
> always busy waiting. Snapshot of thread dump attached.
> Also snap shot of flink stream graph attached
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]

2024-04-15 Thread via GitHub


Jiabao-Sun commented on code in PR #24612:
URL: https://github.com/apache/flink/pull/24612#discussion_r1565548513


##
flink-core/src/test/java/org/apache/flink/configuration/MemorySizePrettyPrintingTest.java:
##
@@ -45,13 +46,13 @@ public static Object[][] parameters() {
 };
 }
 
-@Parameterized.Parameter public MemorySize memorySize;
+@Parameter private MemorySize memorySize;
 
-@Parameterized.Parameter(1)
+@Parameter(value = 1)

Review Comment:
   value can be ignored and expectedString can be private.



##
flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java:
##
@@ -169,7 +170,7 @@ void testGetOptionalFromObject() {
 testSpec.setValue(configuration);
 
 Optional optional = configuration.getOptional(testSpec.getOption());
-assertThat(optional.get(), equalTo(testSpec.getValue()));
+assertThat(optional.get()).isEqualTo(testSpec.getValue());

Review Comment:
   ```suggestion
   assertThat(optional).hasValue(testSpec.getValue());
   ```



##
flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java:
##
@@ -42,57 +42,48 @@ class GlobalConfigurationTest {
 @TempDir private File tmpDir;
 
 @Test
-void testConfigurationWithLegacyYAML() {
+void testConfigurationWithLegacyYAML() throws FileNotFoundException {
 File confFile = new File(tmpDir, 
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
-
-try {
-try (final PrintWriter pw = new PrintWriter(confFile)) {
-
-pw.println("###"); // should be skipped
-pw.println("# Some : comments : to skip"); // should be skipped
-pw.println("###"); // should be skipped
-pw.println("mykey1: myvalue1"); // OK, simple correct case
-pw.println("mykey2   : myvalue2"); // OK, whitespace 
before colon is correct
-pw.println("mykey3:myvalue3"); // SKIP, missing white space 
after colon
-pw.println(" some nonsense without colon and whitespace 
separator"); // SKIP
-pw.println(" :  "); // SKIP
-pw.println("   "); // SKIP (silently)
-pw.println(" "); // SKIP (silently)
-pw.println("mykey4: myvalue4# some comments"); // OK, skip 
comments only
-pw.println("   mykey5:myvalue5"); // OK, trim 
unnecessary whitespace
-pw.println("mykey6: my: value6"); // OK, only use first ': ' 
as separator
-pw.println("mykey7: "); // SKIP, no value provided
-pw.println(": myvalue8"); // SKIP, no key provided
-
-pw.println("mykey9: myvalue9"); // OK
-pw.println("mykey9: myvalue10"); // OK, overwrite last value
-
-} catch (FileNotFoundException e) {
-e.printStackTrace();
-}
-
-Configuration conf = 
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-
-// all distinct keys from confFile1 + confFile2 key
-assertThat(conf.keySet()).hasSize(6);
-
-// keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected 
values
-assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
-assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
-assertThat(conf.getString("mykey2", null)).isEqualTo("myvalue2");
-assertThat(conf.getString("mykey3", "null")).isEqualTo("null");
-assertThat(conf.getString("mykey4", null)).isEqualTo("myvalue4");
-assertThat(conf.getString("mykey5", null)).isEqualTo("myvalue5");
-assertThat(conf.getString("mykey6", null)).isEqualTo("my: value6");
-assertThat(conf.getString("mykey7", "null")).isEqualTo("null");
-assertThat(conf.getString("mykey8", "null")).isEqualTo("null");
-assertThat(conf.getString("mykey9", null)).isEqualTo("myvalue10");
-} finally {
-// Clear the standard yaml flag to avoid impact to other cases.
-GlobalConfiguration.setStandardYaml(true);
-confFile.delete();
-tmpDir.delete();
+try (PrintWriter pw = new PrintWriter(confFile)) {
+pw.println("###"); // should be skipped
+pw.println("# Some : comments : to skip"); // should be skipped
+pw.println("###"); // should be skipped
+pw.println("mykey1: myvalue1"); // OK, simple correct case
+pw.println("mykey2   : myvalue2"); // OK, whitespace before 
colon is correct
+pw.println("mykey3:myvalue3"); // SKIP, missing white space after 
colon
+pw.println(" some nonsense without colon and whitespace 
separator"); // SKIP
+pw.println(" :  "); 

Re: [PR] Fix pubsub topic name javadoc [flink-connector-gcp-pubsub]

2024-04-15 Thread via GitHub


snuyanzin commented on PR #23:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/23#issuecomment-2056554166

   ci fails because of dependency convergence issue which is going to be fixed 
within 
   https://github.com/apache/flink-connector-gcp-pubsub/pull/24


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34961][BP v.1.1] Use dedicated CI name for Opensearch connector to differentiate it in infra-reports [flink-connector-opensearch]

2024-04-15 Thread via GitHub


snuyanzin merged PR #44:
URL: https://github.com/apache/flink-connector-opensearch/pull/44


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]

2024-04-15 Thread via GitHub


GOODBOY008 commented on code in PR #24612:
URL: https://github.com/apache/flink/pull/24612#discussion_r1565637869


##
flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java:
##
@@ -169,7 +170,7 @@ void testGetOptionalFromObject() {
 testSpec.setValue(configuration);
 
 Optional optional = configuration.getOptional(testSpec.getOption());
-assertThat(optional.get(), equalTo(testSpec.getValue()));
+assertThat(optional.get()).isEqualTo(testSpec.getValue());

Review Comment:
   There is no need to clean after test completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]

2024-04-15 Thread via GitHub


GOODBOY008 commented on code in PR #24612:
URL: https://github.com/apache/flink/pull/24612#discussion_r1565642234


##
flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java:
##
@@ -42,57 +42,48 @@ class GlobalConfigurationTest {
 @TempDir private File tmpDir;
 
 @Test
-void testConfigurationWithLegacyYAML() {
+void testConfigurationWithLegacyYAML() throws FileNotFoundException {
 File confFile = new File(tmpDir, 
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
-
-try {
-try (final PrintWriter pw = new PrintWriter(confFile)) {
-
-pw.println("###"); // should be skipped
-pw.println("# Some : comments : to skip"); // should be skipped
-pw.println("###"); // should be skipped
-pw.println("mykey1: myvalue1"); // OK, simple correct case
-pw.println("mykey2   : myvalue2"); // OK, whitespace 
before colon is correct
-pw.println("mykey3:myvalue3"); // SKIP, missing white space 
after colon
-pw.println(" some nonsense without colon and whitespace 
separator"); // SKIP
-pw.println(" :  "); // SKIP
-pw.println("   "); // SKIP (silently)
-pw.println(" "); // SKIP (silently)
-pw.println("mykey4: myvalue4# some comments"); // OK, skip 
comments only
-pw.println("   mykey5:myvalue5"); // OK, trim 
unnecessary whitespace
-pw.println("mykey6: my: value6"); // OK, only use first ': ' 
as separator
-pw.println("mykey7: "); // SKIP, no value provided
-pw.println(": myvalue8"); // SKIP, no key provided
-
-pw.println("mykey9: myvalue9"); // OK
-pw.println("mykey9: myvalue10"); // OK, overwrite last value
-
-} catch (FileNotFoundException e) {
-e.printStackTrace();
-}
-
-Configuration conf = 
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-
-// all distinct keys from confFile1 + confFile2 key
-assertThat(conf.keySet()).hasSize(6);
-
-// keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected 
values
-assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
-assertThat(conf.getString("mykey1", null)).isEqualTo("myvalue1");
-assertThat(conf.getString("mykey2", null)).isEqualTo("myvalue2");
-assertThat(conf.getString("mykey3", "null")).isEqualTo("null");
-assertThat(conf.getString("mykey4", null)).isEqualTo("myvalue4");
-assertThat(conf.getString("mykey5", null)).isEqualTo("myvalue5");
-assertThat(conf.getString("mykey6", null)).isEqualTo("my: value6");
-assertThat(conf.getString("mykey7", "null")).isEqualTo("null");
-assertThat(conf.getString("mykey8", "null")).isEqualTo("null");
-assertThat(conf.getString("mykey9", null)).isEqualTo("myvalue10");
-} finally {
-// Clear the standard yaml flag to avoid impact to other cases.
-GlobalConfiguration.setStandardYaml(true);
-confFile.delete();
-tmpDir.delete();
+try (PrintWriter pw = new PrintWriter(confFile)) {
+pw.println("###"); // should be skipped
+pw.println("# Some : comments : to skip"); // should be skipped
+pw.println("###"); // should be skipped
+pw.println("mykey1: myvalue1"); // OK, simple correct case
+pw.println("mykey2   : myvalue2"); // OK, whitespace before 
colon is correct
+pw.println("mykey3:myvalue3"); // SKIP, missing white space after 
colon
+pw.println(" some nonsense without colon and whitespace 
separator"); // SKIP
+pw.println(" :  "); // SKIP
+pw.println("   "); // SKIP (silently)
+pw.println(" "); // SKIP (silently)
+pw.println("mykey4: myvalue4# some comments"); // OK, skip 
comments only
+pw.println("   mykey5:myvalue5"); // OK, trim 
unnecessary whitespace
+pw.println("mykey6: my: value6"); // OK, only use first ': ' as 
separator
+pw.println("mykey7: "); // SKIP, no value provided
+pw.println(": myvalue8"); // SKIP, no key provided
+
+pw.println("mykey9: myvalue9"); // OK
+pw.println("mykey9: myvalue10"); // OK, overwrite last value
 }
+Configuration conf = 
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
+
+// all distinct keys from confFile1 + confFile2 key
+assertThat(conf.keySet()).hasSize(6);
+
+// keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values
+assertThat(conf.getString("mykey1", nul

Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: Configuration [flink]

2024-04-15 Thread via GitHub


GOODBOY008 commented on code in PR #24612:
URL: https://github.com/apache/flink/pull/24612#discussion_r1565637869


##
flink-core/src/test/java/org/apache/flink/configuration/ReadableWritableConfigurationTest.java:
##
@@ -169,7 +170,7 @@ void testGetOptionalFromObject() {
 testSpec.setValue(configuration);
 
 Optional optional = configuration.getOptional(testSpec.getOption());
-assertThat(optional.get(), equalTo(testSpec.getValue()));
+assertThat(optional.get()).isEqualTo(testSpec.getValue());

Review Comment:
   Compile error with suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34961] Use dedicated CI name for Pulsar connector to differentiate it in infra-reports [flink-connector-pulsar]

2024-04-15 Thread via GitHub


snuyanzin opened a new pull request, #89:
URL: https://github.com/apache/flink-connector-pulsar/pull/89

   ## Purpose of the change
   
   
   
   The PR will allow to differentiate between Pulsar connector statistics and 
others with name ci
   
   
   ## Brief change log
   GHA name change
   
   ## Verifying this change
   
   This change is a trivial rework
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35108] Do not trigger deployment recovery for finished/failed jobs [flink-kubernetes-operator]

2024-04-15 Thread via GitHub


gyfora opened a new pull request, #815:
URL: https://github.com/apache/flink-kubernetes-operator/pull/815

   ## What is the purpose of the change
   
   The deployment recovery mechanism is incorrectly triggered for terminal jobs 
once the JM deployment is deleted after the TTL period.
   
   This causes jobs to be resubmitted. This affects only batch jobs.
   
   ## Brief change log
   
- Check for terminal job state before triggering deployment recovery
   
   ## Verifying this change
   
   new unit tests added
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35108) Deployment recovery is triggered on terminal jobs after jm shutdown ttl

2024-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35108:
---
Labels: pull-request-available  (was: )

> Deployment recovery is triggered on terminal jobs after jm shutdown ttl
> ---
>
> Key: FLINK-35108
> URL: https://issues.apache.org/jira/browse/FLINK-35108
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.8.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>  Labels: pull-request-available
>
> The deployment recovery mechanism is incorrectly triggered for terminal jobs 
> once the JM deployment is deleted after the TTL period. 
> This causes jobs to be resubmitted. This affects only batch jobs.
> The workaround is to set 
> kubernetes.operator.jm-deployment-recovery.enabled: false
>  for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout [flink-kubernetes-operator]

2024-04-15 Thread via GitHub


gyfora merged PR #806:
URL: https://github.com/apache/flink-kubernetes-operator/pull/806


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30388] Move Lazy Initialization of AWS element converters to SinkWriter open() method [flink-connector-aws]

2024-04-15 Thread via GitHub


dannycranmer merged PR #135:
URL: https://github.com/apache/flink-connector-aws/pull/135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-30388) Add support for ElementConverted open() method for KDS/KDF/DDB

2024-04-15 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-30388.
---
Resolution: Fixed

> Add support for ElementConverted open() method for KDS/KDF/DDB
> --
>
> Key: FLINK-30388
> URL: https://issues.apache.org/jira/browse/FLINK-30388
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB, Connectors / Firehose, Connectors 
> / Kinesis
>Reporter: Danny Cranmer
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> FLINK-29938 added support for an {{open()}} method in Async Sink 
> ElementConverter. Once flink-connector-aws upgrades to Flink 1.17 we should 
> implement this method. It was originally implemented 
> [here|https://github.com/apache/flink/pull/21265] but was yanked during the 
> [sync|FLINK-30384]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30388) Add support for ElementConverted open() method for KDS/KDF/DDB

2024-04-15 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837204#comment-17837204
 ] 

Danny Cranmer commented on FLINK-30388:
---

Merged commit 
[{{8cafbbc}}|https://github.com/apache/flink-connector-aws/commit/8cafbbced8659c654e8f507979b47566b45ea547]
 into apache:main 

 

Thanks [~chalixar] !

> Add support for ElementConverted open() method for KDS/KDF/DDB
> --
>
> Key: FLINK-30388
> URL: https://issues.apache.org/jira/browse/FLINK-30388
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB, Connectors / Firehose, Connectors 
> / Kinesis
>Reporter: Danny Cranmer
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> FLINK-29938 added support for an {{open()}} method in Async Sink 
> ElementConverter. Once flink-connector-aws upgrades to Flink 1.17 we should 
> implement this method. It was originally implemented 
> [here|https://github.com/apache/flink/pull/21265] but was yanked during the 
> [sync|FLINK-30384]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35113) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink AWS connectors

2024-04-15 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35113:
--
Fix Version/s: aws-connector-4.3.0
   (was: kafka-4.0.0)
   (was: kafka-3.1.1)

> Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink AWS 
> connectors
> ---
>
> Key: FLINK-35113
> URL: https://issues.apache.org/jira/browse/FLINK-35113
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >