[jira] [Created] (FLINK-34191) Remove RestoreMode#LEGACY

2024-01-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34191:
---

 Summary:  Remove RestoreMode#LEGACY 
 Key: FLINK-34191
 URL: https://issues.apache.org/jira/browse/FLINK-34191
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34190) Deprecate RestoreMode#LEGACY

2024-01-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34190:
---

 Summary: Deprecate RestoreMode#LEGACY
 Key: FLINK-34190
 URL: https://issues.apache.org/jira/browse/FLINK-34190
 Project: Flink
  Issue Type: Sub-task
Reporter: Zakelly Lan
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34189) FLIP-416: Deprecate and remove the RestoreMode#LEGACY

2024-01-21 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-34189:
---

 Summary: FLIP-416: Deprecate and remove the RestoreMode#LEGACY
 Key: FLINK-34189
 URL: https://issues.apache.org/jira/browse/FLINK-34189
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 2.0.0, 1.19.0


[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=287607202]

 

The [FLIP-193|https://cwiki.apache.org/confluence/x/bIyqCw] introduced two 
modes of state file ownership during checkpoint restoration: RestoreMode#CLAIM 
and RestoreMode#NO_CLAIM. The LEGACY mode, which was how Flink worked until 
1.15, has been superseded by NO_CLAIM as the default mode. The main drawback of 
LEGACY mode is that the new job relies on artifacts from the old job without 
cleaning them up, leaving users uncertain about when it is safe to delete the 
old checkpoint directories. This leads to the accumulation of unnecessary 
checkpoint files that are never cleaned up. Considering cluster availability 
and job maintenance, it is not recommended to use LEGACY mode. Users could 
choose the other two modes to get a clear semantic for the state file ownership.

This FLIP proposes to deprecate the LEGACY mode and remove it completely in the 
upcoming Flink 2.0. This will make the semantic clear as well as eliminate many 
bugs caused by mode transitions involving LEGACY mode (e.g. 
!https://issues.apache.org/jira/secure/viewavatar?size=xsmall=21133=issuetype!
 FLINK-27114 - On JM restart, the information about the initial checkpoints can 
be lost OPEN ) and enhance code maintainability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34177) Not able to create FlinkSessionJob in different namespace than flink deployment

2024-01-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34177.
--
Resolution: Not A Problem

Closing this for now as the behaviour is intended. If you would like to change 
the behaviour, please consider creating a FLIP / mailing list discussion about 
it.

> Not able to create FlinkSessionJob in different namespace than flink 
> deployment
> ---
>
> Key: FLINK-34177
> URL: https://issues.apache.org/jira/browse/FLINK-34177
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
> Environment: *AWS EKS K8 version 1.25*
> {*}Kubernetes Flink Operator 1.7.0{*}{*}{*}
> *Flink 1.17/1.18*
>Reporter: Pramod
>Priority: Major
>
> Here is my use case:
> 1) I created a namespace "flink" in aws cluster with k8 version 1.25 
> 2) Deployed flink kubernetes operator in "flink" namespace using helm chart
> 3) deployed FlinkDeployment similar to 
> [flink-kubernetes-operator/examples/basic-session-deployment-only.yaml at 
> main · apache/flink-kubernetes-operator · 
> GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml]
>  in flink namespace
> 4) Then deployed FlinkSession job similar to 
> [flink-kubernetes-operator/examples/basic-session-job-only.yaml at main · 
> apache/flink-kubernetes-operator · 
> GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-job-only.yaml]
>  but in different namespace "tss"
> 5) Already added both the namespaces to watchednamespaces 
> watchNamespaces: ["tss", "flink"]
>  
> Expected:
>  FlinkSessionJob will start in tss namespace
>  
> Actual:
>  Job is not starting and throwing the error "*Flink version null is not 
> supported by this operator version*"
>  
>  
>  
>  
> My suspect is it seems FlinkDeployment and FlinkSessionJob should be in the 
> namespace. However i am not sure. So i am raising this bug.
>  
> {color:#FF}*Can someone confirm if Flink kubernetes operator supports 
> Flink cluster(FlinkDeployment) to be in one namespace and then 
> FlinkSessionJob in another namespace.??*{color}
> Supporting multiple namespaces make my life a lot easier.
>  
> Note: Deploying FlinkSessionJob in the same namespace "Flink" works fine.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster

2024-01-21 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809278#comment-17809278
 ] 

Gyula Fora commented on FLINK-34179:


I am not aware of the bitnami helm chart, and I definitely don't think that it 
is supported by the community. Did you try the Flink Kubernetes operator?

> More than one taskmanager is not coming up in flink (session mode) in aws eks 
> cluster
> -
>
> Key: FLINK-34179
> URL: https://issues.apache.org/jira/browse/FLINK-34179
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.18.0
>Reporter: Nikhil_D
>Priority: Blocker
>  Labels: flink-k8s
> Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, 
> flink-values.yaml
>
>
> Deployed flink in aws eks cluster using bitnami helm chart (bitnami/flink 
> 1.18). After deployment, noticing that out of 5 taskmanager pods only one 
> taskmanager pod is able to connect to resourcemanager which is present in 
> jobmanager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34177) Not able to create FlinkSessionJob in different namespace than flink deployment

2024-01-21 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809280#comment-17809280
 ] 

Gyula Fora commented on FLINK-34177:


Flink session jobs need to be created in the same namespace as the 
corresponding FlinkDeployment. This is an intentional limitation at the moment.

> Not able to create FlinkSessionJob in different namespace than flink 
> deployment
> ---
>
> Key: FLINK-34177
> URL: https://issues.apache.org/jira/browse/FLINK-34177
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
> Environment: *AWS EKS K8 version 1.25*
> {*}Kubernetes Flink Operator 1.7.0{*}{*}{*}
> *Flink 1.17/1.18*
>Reporter: Pramod
>Priority: Major
>
> Here is my use case:
> 1) I created a namespace "flink" in aws cluster with k8 version 1.25 
> 2) Deployed flink kubernetes operator in "flink" namespace using helm chart
> 3) deployed FlinkDeployment similar to 
> [flink-kubernetes-operator/examples/basic-session-deployment-only.yaml at 
> main · apache/flink-kubernetes-operator · 
> GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml]
>  in flink namespace
> 4) Then deployed FlinkSession job similar to 
> [flink-kubernetes-operator/examples/basic-session-job-only.yaml at main · 
> apache/flink-kubernetes-operator · 
> GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-job-only.yaml]
>  but in different namespace "tss"
> 5) Already added both the namespaces to watchednamespaces 
> watchNamespaces: ["tss", "flink"]
>  
> Expected:
>  FlinkSessionJob will start in tss namespace
>  
> Actual:
>  Job is not starting and throwing the error "*Flink version null is not 
> supported by this operator version*"
>  
>  
>  
>  
> My suspect is it seems FlinkDeployment and FlinkSessionJob should be in the 
> namespace. However i am not sure. So i am raising this bug.
>  
> {color:#FF}*Can someone confirm if Flink kubernetes operator supports 
> Flink cluster(FlinkDeployment) to be in one namespace and then 
> FlinkSessionJob in another namespace.??*{color}
> Supporting multiple namespaces make my life a lot easier.
>  
> Note: Deploying FlinkSessionJob in the same namespace "Flink" works fine.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34180) Accept Flink CDC project as part of Apache Flink

2024-01-21 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-34180:
--

Assignee: Leonard Xu

> Accept Flink CDC project as part of Apache Flink
> 
>
> Key: FLINK-34180
> URL: https://issues.apache.org/jira/browse/FLINK-34180
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>
> As discussed in  Flink dev  mailing list[1][2], we have accepted the Flink 
> CDC project contribution, we should finished the repo and doc migration as 
> soon as possible.
> [1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w
> [2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster

2024-01-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34179:
---
Priority: Minor  (was: Blocker)

> More than one taskmanager is not coming up in flink (session mode) in aws eks 
> cluster
> -
>
> Key: FLINK-34179
> URL: https://issues.apache.org/jira/browse/FLINK-34179
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.18.0
>Reporter: Nikhil_D
>Priority: Minor
>  Labels: flink-k8s
> Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, 
> flink-values.yaml
>
>
> Deployed flink in aws eks cluster using bitnami helm chart (bitnami/flink 
> 1.18). After deployment, noticing that out of 5 taskmanager pods only one 
> taskmanager pod is able to connect to resourcemanager which is present in 
> jobmanager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34188) Setup release infrastructure for Flink CDC project

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34188:
--

 Summary: Setup release infrastructure for Flink CDC project
 Key: FLINK-34188
 URL: https://issues.apache.org/jira/browse/FLINK-34188
 Project: Flink
  Issue Type: Sub-task
  Components: Flink CDC
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34181) Migrate repo from ververica to apche

2024-01-21 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34181:
---
Component/s: Flink CDC

> Migrate repo from ververica to apche
> 
>
> Key: FLINK-34181
> URL: https://issues.apache.org/jira/browse/FLINK-34181
> Project: Flink
>  Issue Type: Sub-task
>  Components: Flink CDC
>Reporter: Leonard Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34182) Migrate doc website from ververica to flink

2024-01-21 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-34182:
---
Component/s: Flink CDC

> Migrate doc website from ververica to flink   
> 
>
> Key: FLINK-34182
> URL: https://issues.apache.org/jira/browse/FLINK-34182
> Project: Flink
>  Issue Type: Sub-task
>  Components: Flink CDC
>Reporter: Leonard Xu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34187) Setup CI for Flink CDC project

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34187:
--

 Summary: Setup CI for Flink CDC project
 Key: FLINK-34187
 URL: https://issues.apache.org/jira/browse/FLINK-34187
 Project: Flink
  Issue Type: Sub-task
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34186) Migrate issue management from Github to JIRA

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34186:
--

 Summary: Migrate issue management from Github to JIRA
 Key: FLINK-34186
 URL: https://issues.apache.org/jira/browse/FLINK-34186
 Project: Flink
  Issue Type: Sub-task
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34185) Remove unwanted bundle dependencies

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34185:
--

 Summary: Remove unwanted bundle dependencies
 Key: FLINK-34185
 URL: https://issues.apache.org/jira/browse/FLINK-34185
 Project: Flink
  Issue Type: Sub-task
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34184) Update copyright and license file

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34184:
--

 Summary: Update copyright and license file
 Key: FLINK-34184
 URL: https://issues.apache.org/jira/browse/FLINK-34184
 Project: Flink
  Issue Type: Sub-task
  Components: Flink CDC
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34182) Migrate doc website from ververica to flink

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34182:
--

 Summary: Migrate doc website from ververica to flink   
 Key: FLINK-34182
 URL: https://issues.apache.org/jira/browse/FLINK-34182
 Project: Flink
  Issue Type: Sub-task
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34183) Add NOTICE files for Flink CDC project

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34183:
--

 Summary: Add NOTICE files for Flink CDC project
 Key: FLINK-34183
 URL: https://issues.apache.org/jira/browse/FLINK-34183
 Project: Flink
  Issue Type: Sub-task
  Components: Flink CDC
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34181) Migrate repo from ververica to apche

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34181:
--

 Summary: Migrate repo from ververica to apche
 Key: FLINK-34181
 URL: https://issues.apache.org/jira/browse/FLINK-34181
 Project: Flink
  Issue Type: Sub-task
Reporter: Leonard Xu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34180) Accept Flink CDC project as part of Apache Flink

2024-01-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-34180:
--

 Summary: Accept Flink CDC project as part of Apache Flink
 Key: FLINK-34180
 URL: https://issues.apache.org/jira/browse/FLINK-34180
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: Leonard Xu


As discussed in  Flink dev  mailing list[1][2], we have accepted the Flink CDC 
project contribution, we should finished the repo and doc migration as soon as 
possible.

[1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w
[2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is always great than `stabilization.interval` [flink-kubernetes-operator]

2024-01-21 Thread via GitHub


1996fanrui commented on code in PR #759:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/759#discussion_r1461443852


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -160,14 +160,23 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
 var jobTopology = collectedMetrics.getJobTopology();
 
+var now = clock.instant();

Review Comment:
   Hi @afedulov ,
   
   We are using `now` as the `endTime` of ScalingRecord. The 
`kubernetes.operator.reconcile.interval` is 1 min by default and  `scaling` is 
called in each reconcile. 
   
   So, the restartTime will be greater than `1 min` even if the scaling is very 
quick.
   
   I'm not sure should we get the endTime from the `JobUpdateTs`? Currently, 
`stabilization.interval` checks the `JobUpdateTs`.
   
   Looking forward to your feedback, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r146161


##
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java:
##
@@ -330,23 +338,87 @@ void testSerializationWithTypesMismatch(AvroEncoding 
encoding) throws Exception
 .hasStackTraceContaining("Fail to serialize at field: f1");
 }
 
+@Test
+void testTimestampTypeLegacyMapping() throws Exception {
+final Tuple4, SpecificRecord, 
GenericRecord, Row> testData =
+AvroTestUtils.getTimestampTestData();
+
+SpecificDatumWriter datumWriter = new 
SpecificDatumWriter<>(Timestamps.class);
+ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+Encoder encoder = 
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
+datumWriter.write((Timestamps) testData.f1, encoder);
+encoder.flush();
+
+DataType dataType =
+AvroSchemaConverter.convertToDataType(
+
SpecificData.get().getSchema(Timestamps.class).toString());
+
+// Timestamp with local timezone is converted to BigIntType
+assertThat(dataType.getChildren().get(2))
+.isEqualTo(new AtomicDataType(new BigIntType(false)));
+assertThat(dataType.getChildren().get(3))
+.isEqualTo(new AtomicDataType(new BigIntType(false)));
+
+assertThatThrownBy(() -> createSerializationSchema(dataType, 
AvroEncoding.BINARY, true))
+.isInstanceOf(IllegalArgumentException.class)
+.hasMessage(
+"Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+
+assertThatThrownBy(() -> createDeserializationSchema(dataType, 
AvroEncoding.BINARY, true))
+.isInstanceOf(IllegalArgumentException.class)
+.hasMessage(
+"Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");

Review Comment:
   Plz ignore my stupid comment,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34178) The ScalingTracking of autoscaler is wrong

2024-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34178:
---
Labels: pull-request-available  (was: )

> The ScalingTracking of autoscaler is wrong
> --
>
> Key: FLINK-34178
> URL: https://issues.apache.org/jira/browse/FLINK-34178
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
>
> The ScalingTracking of autoscaler is wrong, it's always greater than 
> AutoScalerOptions#STABILIZATION_INTERVAL.
> h2. Reason:
> When flink job isStabilizing, ScalingMetricCollector#updateMetrics will 
> return a empty metric history. In the JobAutoScalerImpl#runScalingLogic 
> method, if `collectedMetrics.getMetricHistory().isEmpty()` , we don't update 
> the ScalingTracking.
>  
> The default value of AutoScalerOptions#STABILIZATION_INTERVAL is 5 min, so 
> the restartTime is always greater than 5 min.
> However, it's quick when we use rescale api.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is always great than `stabilization.interval` [flink-kubernetes-operator]

2024-01-21 Thread via GitHub


1996fanrui opened a new pull request, #759:
URL: https://github.com/apache/flink-kubernetes-operator/pull/759

   ## What is the purpose of the change
   
   The ScalingTracking of autoscaler is wrong, it's always greater than 
AutoScalerOptions#STABILIZATION_INTERVAL.
   
   ### Reason
   
   When flink job isStabilizing, ScalingMetricCollector#updateMetrics will 
return a empty metric history. In the JobAutoScalerImpl#runScalingLogic method, 
if `collectedMetrics.getMetricHistory().isEmpty()` , we don't update the 
ScalingTracking.
   
   The default value of AutoScalerOptions#STABILIZATION_INTERVAL is 5 min, so 
the restartTime is always greater than 5 min.
   
   However, it's quick when we use rescale api.
   
   
   ## Brief change log
   
   Update the scaling tracking even if the job is stabilizing.
   
   ## Verifying this change
   
   Adding the test later.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461426662


##
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java:
##
@@ -330,23 +338,87 @@ void testSerializationWithTypesMismatch(AvroEncoding 
encoding) throws Exception
 .hasStackTraceContaining("Fail to serialize at field: f1");
 }
 
+@Test
+void testTimestampTypeLegacyMapping() throws Exception {
+final Tuple4, SpecificRecord, 
GenericRecord, Row> testData =
+AvroTestUtils.getTimestampTestData();
+
+SpecificDatumWriter datumWriter = new 
SpecificDatumWriter<>(Timestamps.class);
+ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+Encoder encoder = 
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
+datumWriter.write((Timestamps) testData.f1, encoder);
+encoder.flush();
+
+DataType dataType =
+AvroSchemaConverter.convertToDataType(
+
SpecificData.get().getSchema(Timestamps.class).toString());
+
+// Timestamp with local timezone is converted to BigIntType
+assertThat(dataType.getChildren().get(2))
+.isEqualTo(new AtomicDataType(new BigIntType(false)));
+assertThat(dataType.getChildren().get(3))
+.isEqualTo(new AtomicDataType(new BigIntType(false)));
+
+assertThatThrownBy(() -> createSerializationSchema(dataType, 
AvroEncoding.BINARY, true))
+.isInstanceOf(IllegalArgumentException.class)
+.hasMessage(
+"Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+
+assertThatThrownBy(() -> createDeserializationSchema(dataType, 
AvroEncoding.BINARY, true))
+.isInstanceOf(IllegalArgumentException.class)
+.hasMessage(
+"Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");

Review Comment:
   It is for testing createSerializationSchema and createDeserializationSchema. 
Two different scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461418192


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##
@@ -317,6 +408,21 @@ public static Schema convertToSchema(LogicalType schema) {
  * @return Avro's {@link Schema} matching this logical type.
  */
 public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+return convertToSchema(logicalType, rowName, true);
+}
+
+/**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+ *
+ * The "{rowName}_" is used as the nested row type name prefix in order 
to generate the right
+ * schema. Nested record type that only differs with type name is still 
compatible.
+ *
+ * @param logicalType logical type
+ * @param rowName the record name

Review Comment:
   also add annotation for param `legacyTimestampMapping` ?



##
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java:
##
@@ -87,16 +104,83 @@ void testSeDeSchema(AvroEncoding encoding) {
 assertThat(actualSer).isEqualTo(expectedSer);
 }
 
+@Test
+void testOldSeDeNewSchema() {
+assertThatThrownBy(
+() -> {
+new AvroRowDataDeserializationSchema(
+NEW_ROW_TYPE, 
InternalTypeInfo.of(NEW_ROW_TYPE));
+})
+.isInstanceOf(IllegalArgumentException.class)
+.hasMessage(
+"Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+
+assertThatThrownBy(
+() -> {
+new AvroRowDataSerializationSchema(NEW_ROW_TYPE);
+})
+.isInstanceOf(IllegalArgumentException.class)
+.hasMessage(
+"Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+}
+
+@Test
+void testNewSeDeNewSchema() {
+testSeDeSchema(NEW_ROW_TYPE, NEW_SCHEMA, false);
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testSeDeSchema(boolean legacyMapping) {
+testSeDeSchema(ROW_TYPE, SCHEMA, legacyMapping);
+}
+
+void testSeDeSchema(RowType rowType, ResolvedSchema schema, boolean 
legacyMapping) {
+final AvroRowDataDeserializationSchema expectedDeser =
+new AvroRowDataDeserializationSchema(
+rowType, InternalTypeInfo.of(rowType), 
AvroEncoding.BINARY, legacyMapping);
+
+final Map options = getAllOptions(legacyMapping);
+
+final DynamicTableSource actualSource = 
FactoryMocks.createTableSource(schema, options);
+
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
+TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+DeserializationSchema actualDeser =
+scanSourceMock.valueFormat.createRuntimeDecoder(
+ScanRuntimeProviderContext.INSTANCE, 
schema.toPhysicalRowDataType());
+
+assertThat(actualDeser).isEqualTo(expectedDeser);
+
+final AvroRowDataSerializationSchema expectedSer =
+new AvroRowDataSerializationSchema(rowType, 
AvroEncoding.BINARY, legacyMapping);
+
+final DynamicTableSink actualSink = 
FactoryMocks.createTableSink(schema, options);
+
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+SerializationSchema actualSer =
+sinkMock.valueFormat.createRuntimeEncoder(null, 
schema.toPhysicalRowDataType());
+
+assertThat(actualSer).isEqualTo(expectedSer);
+}
+
 // 
 //  Utilities
 // 
 
-private Map getAllOptions() {
+private Map getAllOptions(boolean legacyMapping) {

Review Comment:
   legacyTimestampMapping ?



##
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java:
##
@@ -330,23 +338,87 @@ void testSerializationWithTypesMismatch(AvroEncoding 
encoding) throws Exception
 .hasStackTraceContaining("Fail to serialize at field: f1");
 }
 
+@Test
+void testTimestampTypeLegacyMapping() throws Exception {
+final Tuple4, SpecificRecord, 
GenericRecord, Row> testData =
+

[jira] [Updated] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster

2024-01-21 Thread Nikhil_D (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikhil_D updated FLINK-34179:
-
Description: Deployed flink in aws eks cluster using bitnami helm chart 
(bitnami/flink 1.18). After deployment, noticing that out of 5 taskmanager pods 
only one taskmanager pod is able to connect to resourcemanager which is present 
in jobmanager.  (was: Deployed flink in aws eks cluster using bitnami helm 
chart. After deployment, noticing that out of 5 taskmanager pods only one 
taskmanager pod is able to connect to resourcemanager which is present in 
jobmanager.)

> More than one taskmanager is not coming up in flink (session mode) in aws eks 
> cluster
> -
>
> Key: FLINK-34179
> URL: https://issues.apache.org/jira/browse/FLINK-34179
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.18.0
>Reporter: Nikhil_D
>Priority: Blocker
>  Labels: flink-k8s
> Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, 
> flink-values.yaml
>
>
> Deployed flink in aws eks cluster using bitnami helm chart (bitnami/flink 
> 1.18). After deployment, noticing that out of 5 taskmanager pods only one 
> taskmanager pod is able to connect to resourcemanager which is present in 
> jobmanager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster

2024-01-21 Thread Nikhil_D (Jira)
Nikhil_D created FLINK-34179:


 Summary: More than one taskmanager is not coming up in flink 
(session mode) in aws eks cluster
 Key: FLINK-34179
 URL: https://issues.apache.org/jira/browse/FLINK-34179
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.18.0
Reporter: Nikhil_D
 Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, 
flink-values.yaml

Deployed flink in aws eks cluster using bitnami helm chart. After deployment, 
noticing that out of 5 taskmanager pods only one taskmanager pod is able to 
connect to resourcemanager which is present in jobmanager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461404314


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java:
##
@@ -71,5 +70,11 @@ public InlineElement getDescription() {
 }
 }
 
+public static final ConfigOption AVRO_TIMESTAMP_LEGACY_MAPPING =
+ConfigOptions.key("timestamp_mapping.legacy")
+.booleanType()
+.defaultValue(true)
+.withDescription("Use the legacy mapping of timestamp in 
avro");

Review Comment:
   Could you add more description here just like ?
   `The legacy behavior of Flink wrongly mapped both SQL TIMESTAMP and 
TIMESTAMP_LTZ type to AVRO TIMESTAMP before Flink 1.19 version, the correct 
behavior is Flink SQL TIMESTAMP maps Avro LOCAL TIMESTAMP and Flink SQL 
TIMESTAMP_LTZ maps Avro TIMESTAMP, you can obtain the correct mapping by 
disable using this legacy mapping. Use legacy behavior by default for 
compatibility consideration.
   ` You can polish this base on my poor English.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461398939


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
 return AvroToRowDataConverters::convertToTime;
 case TIMESTAMP_WITHOUT_TIME_ZONE:
 return AvroToRowDataConverters::convertToTimestamp;

Review Comment:
   In the FLIP 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-378%3A+Support+Avro+timestamp+with+local+timezone,
 we have proposed the new mapping. Does it cover both the change in 
AvroToRowDataConverters and RowDataToAvroConverters? 
   
   additionally support Avro LocalTimestamp <-> Flink 
TIMESTAMP_WITHOUT_TIME_ZONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33006] add e2e for Flink operator HA [flink-kubernetes-operator]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on PR #756:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/756#issuecomment-1903350455

   @gyfora Thanks for all of the suggestion. I have changed scripts 
accordingly. Please approve the ci build workflow for further verification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461398939


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
 return AvroToRowDataConverters::convertToTime;
 case TIMESTAMP_WITHOUT_TIME_ZONE:
 return AvroToRowDataConverters::convertToTimestamp;

Review Comment:
   In the FLIP 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-378%3A+Support+Avro+timestamp+with+local+timezone,
 we have proposed the new mapping. Does it cover both the change in 
AvroToRowDataConverters and RowDataToAvroConverters? 
   
   Avro Timestamp <-> Flink TIMESTAMP_WITH_LOCAL_TIME_ZONE
   additionally support Avro LocalTimestamp <-> Flink 
TIMESTAMP_WITHOUT_TIME_ZONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461398939


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
 return AvroToRowDataConverters::convertToTime;
 case TIMESTAMP_WITHOUT_TIME_ZONE:
 return AvroToRowDataConverters::convertToTimestamp;

Review Comment:
   In the FLIP, we have proposed the new mapping. Does it cover both the change 
in AvroToRowDataConverters and RowDataToAvroConverters? 
   
   Avro Timestamp <-> Flink TIMESTAMP_WITH_LOCAL_TIME_ZONE
   additionally support Avro LocalTimestamp <-> Flink 
TIMESTAMP_WITHOUT_TIME_ZONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34166) KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty

2024-01-21 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809254#comment-17809254
 ] 

lincoln lee commented on FLINK-34166:
-

fixed in master: 27e6ac836171c5c5539ceeb234a806be661cc30a

> KeyedLookupJoinWrapper incorrectly process delete message for inner join when 
> previous lookup result is empty
> -
>
> Key: FLINK-34166
> URL: https://issues.apache.org/jira/browse/FLINK-34166
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.2, 1.18.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> KeyedLookupJoinWrapper(when 'table.optimizer.non-deterministic-update.strategy
> ' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns) 
> incorrectly process delete message for inner join when previous lookup result 
> is empty
> The intermediate delete result 
> {code}
> expectedOutput.add(deleteRecord(3, "c", null, null));
> {code}
> in current case 
> KeyedLookupJoinHarnessTest#testTemporalInnerJoinWithFilterLookupKeyContainsPk 
> is incorrect:
> {code}
> @Test
> public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws 
> Exception {
> OneInputStreamOperatorTestHarness testHarness =
> createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, 
> true);
> testHarness.open();
> testHarness.processElement(insertRecord(1, "a"));
> testHarness.processElement(insertRecord(2, "b"));
> testHarness.processElement(insertRecord(3, "c"));
> testHarness.processElement(insertRecord(4, "d"));
> testHarness.processElement(insertRecord(5, "e"));
> testHarness.processElement(updateBeforeRecord(3, "c"));
> testHarness.processElement(updateAfterRecord(3, "c2"));
> testHarness.processElement(deleteRecord(3, "c2"));
> testHarness.processElement(insertRecord(3, "c3"));
> List expectedOutput = new ArrayList<>();
> expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
> expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
> expectedOutput.add(deleteRecord(3, "c", null, null));
> expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
> expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
> expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
> assertor.assertOutputEquals("output wrong.", expectedOutput, 
> testHarness.getOutput());
> testHarness.close();
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


lincoln-lil merged PR #24152:
URL: https://github.com/apache/flink/pull/24152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34178) The ScalingTracking of autoscaler is wrong

2024-01-21 Thread Rui Fan (Jira)
Rui Fan created FLINK-34178:
---

 Summary: The ScalingTracking of autoscaler is wrong
 Key: FLINK-34178
 URL: https://issues.apache.org/jira/browse/FLINK-34178
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler
Affects Versions: kubernetes-operator-1.8.0
Reporter: Rui Fan
Assignee: Rui Fan


The ScalingTracking of autoscaler is wrong, it's always greater than 
AutoScalerOptions#STABILIZATION_INTERVAL.
h2. Reason:

When flink job isStabilizing, ScalingMetricCollector#updateMetrics will return 
a empty metric history. In the JobAutoScalerImpl#runScalingLogic method, if 
`collectedMetrics.getMetricHistory().isEmpty()` , we don't update the 
ScalingTracking.

 

The default value of AutoScalerOptions#STABILIZATION_INTERVAL is 5 min, so the 
restartTime is always greater than 5 min.

However, it's quick when we use rescale api.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461377732


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
 return AvroToRowDataConverters::convertToTime;
 case TIMESTAMP_WITHOUT_TIME_ZONE:
 return AvroToRowDataConverters::convertToTimestamp;

Review Comment:
   This a behavior change that we didn't discuss in dev mail-list, I'm 
considering to need discuss in mail-list, maybe we should back to mail-list and 
start a quick discussion, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on PR #23511:
URL: https://github.com/apache/flink/pull/23511#issuecomment-1903283601

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on PR #23511:
URL: https://github.com/apache/flink/pull/23511#issuecomment-1903283177

   @leonardBang 
   Thanks for providing valuable comments. I have refactor the code to resolve 
your comments. Another following PR will be created to change precision. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]

2024-01-21 Thread via GitHub


1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1461348956


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##
@@ -61,8 +61,9 @@ public long getBackoffTime() {
 }
 
 @Override
-public void notifyFailure(Throwable cause) {
+public boolean notifyFailure(Throwable cause) {
 currentRestartAttempt++;
+return true;

Review Comment:
   > I guess, you meant 
[FLIP-364](https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+exponential-delay+restart-strategy)
 here?
   
   Exactly, sorry for my typo.
   
   > What about deprecating that class as part of this PR and coming up with a 
follow-up Jira issue that replaces the strategy?
   
   They are internal class, IIUC, we can refactor it directly without the 
`@Deprecated` annotation.
   
   
-
   
   Your suggestion is that we still keep the failure-rate and fixed-delay 
restart-strategies, but we can reuse the 
`ExponentialDelayRestartBackoffTimeStrategy` class, right?
   
   I'm thinking could we deprecate failure-rate and fixed-delay 
restart-strategies directly? Users can configure them directly.
   
   Keep failure-rate and fixed-delay restart-strategies and reusing 
`ExponentialDelayRestartBackoffTimeStrategy` class still has the semantic 
problem that I mentioned in the last comment [1].
   
   [1] https://github.com/apache/flink/pull/24003#discussion_r1456807889



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34176) Remove unnecessary failure-rate and fixed-delay restart-strategies

2024-01-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-34176:

Summary: Remove unnecessary failure-rate and fixed-delay restart-strategies 
  (was: Remove unnecessary implemetations of RestartBackoffTimeStrategy)

> Remove unnecessary failure-rate and fixed-delay restart-strategies 
> ---
>
> Key: FLINK-34176
> URL: https://issues.apache.org/jira/browse/FLINK-34176
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> h2. Could we deprecate the  failure-rate and fixed-delay restart-strategies 
> directly?
> After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is 
> already very feature-rich. It can replace fix-delay and failure rate restart 
> strategies directly.
> h2. How to replace fix-delay restart strategy?
>  * Set backoffMultiplier = 1 and jitterFactor = 0
>  * resetBackoffThresholdMS = Interget.Max
>  * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
> FixedDelayRestartBackoffTimeStrategy
>  * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of 
> FixedDelayRestartBackoffTimeStrategy
> h2. How to replace failure-rate restart strategy?
>  * Set backoffMultiplier = 1 and jitterFactor = 0
>  * resetBackoffThresholdMS is the failuresIntervalMS of 
> FailureRateRestartBackoffTimeStrategy
>  * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
> FailureRateRestartBackoffTimeStrategy
>  * attemptsBeforeResetBackoff is the maxFailuresPerInterval of 
> FailureRateRestartBackoffTimeStrategy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34176) Remove unnecessary implemetations of RestartBackoffTimeStrategy

2024-01-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-34176:

Description: 
h2. Could we deprecate the  failure-rate and fixed-delay restart-strategies 
directly?

After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is already 
very feature-rich. It can replace fix-delay and failure rate restart strategies 
directly.
h2. How to replace fix-delay restart strategy?
 * Set backoffMultiplier = 1 and jitterFactor = 0
 * resetBackoffThresholdMS = Interget.Max
 * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
FixedDelayRestartBackoffTimeStrategy
 * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of 
FixedDelayRestartBackoffTimeStrategy

h2. How to replace failure-rate restart strategy?
 * Set backoffMultiplier = 1 and jitterFactor = 0
 * resetBackoffThresholdMS is the failuresIntervalMS of 
FailureRateRestartBackoffTimeStrategy
 * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
FailureRateRestartBackoffTimeStrategy
 * attemptsBeforeResetBackoff is the maxFailuresPerInterval of 
FailureRateRestartBackoffTimeStrategy

  was:
h2. Could we deprecate the  failure-rate and fixed-delay restart-strategies 
directly?

After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is already 
very feature-rich. It can replace fix-delay and failure rate restart strategies 
directly.
h2. How to replace FixedDelayRestartBackoffTimeStrategy?
 * Set backoffMultiplier = 1 and jitterFactor = 0
 * resetBackoffThresholdMS = Interget.Max
 * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
FixedDelayRestartBackoffTimeStrategy
 * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of 
FixedDelayRestartBackoffTimeStrategy

h2. How to replace FailureRateRestartBackoffTimeStrategy?
 * Set backoffMultiplier = 1 and jitterFactor = 0
 * resetBackoffThresholdMS is the failuresIntervalMS of 
FailureRateRestartBackoffTimeStrategy
 * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
FailureRateRestartBackoffTimeStrategy
 * attemptsBeforeResetBackoff is the maxFailuresPerInterval of 
FailureRateRestartBackoffTimeStrategy

 

 


> Remove unnecessary implemetations of RestartBackoffTimeStrategy
> ---
>
> Key: FLINK-34176
> URL: https://issues.apache.org/jira/browse/FLINK-34176
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> h2. Could we deprecate the  failure-rate and fixed-delay restart-strategies 
> directly?
> After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is 
> already very feature-rich. It can replace fix-delay and failure rate restart 
> strategies directly.
> h2. How to replace fix-delay restart strategy?
>  * Set backoffMultiplier = 1 and jitterFactor = 0
>  * resetBackoffThresholdMS = Interget.Max
>  * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
> FixedDelayRestartBackoffTimeStrategy
>  * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of 
> FixedDelayRestartBackoffTimeStrategy
> h2. How to replace failure-rate restart strategy?
>  * Set backoffMultiplier = 1 and jitterFactor = 0
>  * resetBackoffThresholdMS is the failuresIntervalMS of 
> FailureRateRestartBackoffTimeStrategy
>  * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
> FailureRateRestartBackoffTimeStrategy
>  * attemptsBeforeResetBackoff is the maxFailuresPerInterval of 
> FailureRateRestartBackoffTimeStrategy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


HuangZhenQiu commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1461340507


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
 return AvroToRowDataConverters::convertToTime;
 case TIMESTAMP_WITHOUT_TIME_ZONE:
 return AvroToRowDataConverters::convertToTimestamp;

Review Comment:
   Correct. The incorrect part for Flink SQL TIMESTAMP_NTZ type  is in 
RowDataToAvroConverters. I just add TIMESTAMP_WITH_LOCAL_TIME_ZONE  handling in 
the new mapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34177) Not able to create FlinkSessionJob in different namespace than flink deployment

2024-01-21 Thread Pramod (Jira)
Pramod created FLINK-34177:
--

 Summary: Not able to create FlinkSessionJob in different namespace 
than flink deployment
 Key: FLINK-34177
 URL: https://issues.apache.org/jira/browse/FLINK-34177
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
 Environment: *AWS EKS K8 version 1.25*

{*}Kubernetes Flink Operator 1.7.0{*}{*}{*}

*Flink 1.17/1.18*
Reporter: Pramod


Here is my use case:

1) I created a namespace "flink" in aws cluster with k8 version 1.25 

2) Deployed flink kubernetes operator in "flink" namespace using helm chart

3) deployed FlinkDeployment similar to 
[flink-kubernetes-operator/examples/basic-session-deployment-only.yaml at main 
· apache/flink-kubernetes-operator · 
GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml]
 in flink namespace

4) Then deployed FlinkSession job similar to 
[flink-kubernetes-operator/examples/basic-session-job-only.yaml at main · 
apache/flink-kubernetes-operator · 
GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-job-only.yaml]
 but in different namespace "tss"

5) Already added both the namespaces to watchednamespaces 
watchNamespaces: ["tss", "flink"]
 
Expected:
 FlinkSessionJob will start in tss namespace
 
Actual:
 Job is not starting and throwing the error "*Flink version null is not 
supported by this operator version*"
 
 
 
 
My suspect is it seems FlinkDeployment and FlinkSessionJob should be in the 
namespace. However i am not sure. So i am raising this bug.
 
{color:#FF}*Can someone confirm if Flink kubernetes operator supports Flink 
cluster(FlinkDeployment) to be in one namespace and then FlinkSessionJob in 
another namespace.??*{color}
Supporting multiple namespaces make my life a lot easier.
 
Note: Deploying FlinkSessionJob in the same namespace "Flink" works fine.
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34176) Remove unnecessary implemetations of RestartBackoffTimeStrategy

2024-01-21 Thread Rui Fan (Jira)
Rui Fan created FLINK-34176:
---

 Summary: Remove unnecessary implemetations of 
RestartBackoffTimeStrategy
 Key: FLINK-34176
 URL: https://issues.apache.org/jira/browse/FLINK-34176
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Rui Fan
Assignee: Rui Fan


h2. Could we deprecate the  failure-rate and fixed-delay restart-strategies 
directly?

After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is already 
very feature-rich. It can replace fix-delay and failure rate restart strategies 
directly.
h2. How to replace FixedDelayRestartBackoffTimeStrategy?
 * Set backoffMultiplier = 1 and jitterFactor = 0
 * resetBackoffThresholdMS = Interget.Max
 * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
FixedDelayRestartBackoffTimeStrategy
 * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of 
FixedDelayRestartBackoffTimeStrategy

h2. How to replace FailureRateRestartBackoffTimeStrategy?
 * Set backoffMultiplier = 1 and jitterFactor = 0
 * resetBackoffThresholdMS is the failuresIntervalMS of 
FailureRateRestartBackoffTimeStrategy
 * initialBackoffMS and maxBackoffMS are the backoffTimeMS of 
FailureRateRestartBackoffTimeStrategy
 * attemptsBeforeResetBackoff is the maxFailuresPerInterval of 
FailureRateRestartBackoffTimeStrategy

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-21 Thread via GitHub


flinkbot commented on PR #24162:
URL: https://github.com/apache/flink/pull/24162#issuecomment-1903131997

   
   ## CI report:
   
   * 5b90be3d7e080cfaea62a1711406a21319b415a2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34100) Support session window table function without pulling up with window agg

2024-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34100:
---
Labels: pull-request-available  (was: )

> Support session window table function without pulling up with window agg 
> -
>
> Key: FLINK-34100
> URL: https://issues.apache.org/jira/browse/FLINK-34100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> This subtask resolves the session support in ExecWindowTableFunction. And 
> then the session window can support window agg with 
> WindowAttachedWindowingStrategy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-21 Thread via GitHub


xuyangzhong opened a new pull request, #24162:
URL: https://github.com/apache/flink/pull/24162

   ## What is the purpose of the change
   
   Introduce windowed unslice window assigner. And support session window table 
function without pulling up with window agg. 
   
   Note: this is a draft pr, and only the commits which messages start with 
'[FLINK-34100]' is new commits related to this pr.
   
   ## Brief change log
   
 - *add function getDescription for internal interface WindowAssigner*
 - *extract a WindowTableFunctionOperatorBase from 
WindowTableFunctionOperator to prepare for introducing unaligned window table 
function*
 - *introduce UnalignedWindowTableFunctionOperator for unaligned window*
 - *fix missing exchange before window table function node*
 - *support session window table function without pulling up with window 
agg*
   
   ## Verifying this change
   
   Some harness tests and ITCases are added.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? A later single pr.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33221) Add config options for administrator JVM options

2024-01-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33221.
-
Resolution: Fixed

> Add config options for administrator JVM options
> 
>
> Key: FLINK-33221
> URL: https://issues.apache.org/jira/browse/FLINK-33221
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> We encounter similar issues described in SPARK-23472. Users may need to add 
> JVM options to their Flink applications (e.g. to tune GC options). They 
> typically use {{env.java.opts.x}} series of options to do so. We also have a 
> set of administrator JVM options to apply by default, e.g. to enable GC log, 
> tune GC options, etc. Both use cases will need to set the same series of 
> options and will clobber one another.
> In the past, we generated and pretended to the administrator JVM options in 
> the Java code for generating the starting command for JM/TM. However, this 
> has been proven to be difficult to maintain.
> Therefore, I propose to also add a set of default JVM options for 
> administrator use that prepends the user-set extra JVM options. We can mark 
> the existing {{env.java.opts.x}} series as user-set extra JVM options and add 
> a set of new {{env.java.opts.x.default}} options for administrator use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33221) Add config options for administrator JVM options

2024-01-21 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809233#comment-17809233
 ] 

Rui Fan commented on FLINK-33221:
-

Merged to master(1.19.0) via : b4eb8ac503f41fd793db1ac662fbedc46af92fd5

> Add config options for administrator JVM options
> 
>
> Key: FLINK-33221
> URL: https://issues.apache.org/jira/browse/FLINK-33221
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> We encounter similar issues described in SPARK-23472. Users may need to add 
> JVM options to their Flink applications (e.g. to tune GC options). They 
> typically use {{env.java.opts.x}} series of options to do so. We also have a 
> set of administrator JVM options to apply by default, e.g. to enable GC log, 
> tune GC options, etc. Both use cases will need to set the same series of 
> options and will clobber one another.
> In the past, we generated and pretended to the administrator JVM options in 
> the Java code for generating the starting command for JM/TM. However, this 
> has been proven to be difficult to maintain.
> Therefore, I propose to also add a set of default JVM options for 
> administrator use that prepends the user-set extra JVM options. We can mark 
> the existing {{env.java.opts.x}} series as user-set extra JVM options and add 
> a set of new {{env.java.opts.x.default}} options for administrator use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32732][Connectors/Kafka] auto offset reset should be exposed t… [flink-connector-kafka]

2024-01-21 Thread via GitHub


zhougit86 commented on PR #43:
URL: 
https://github.com/apache/flink-connector-kafka/pull/43#issuecomment-1903098823

   > @zhougit86 Can you please rebase your PR?
   
   @MartijnVisser  Would you please tell me I should rebase this onto which 
branch? And would you please let me know you thinking of this MR?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33221][core][config] Add config options for administrator JVM options [flink]

2024-01-21 Thread via GitHub


1996fanrui merged PR #24098:
URL: https://github.com/apache/flink/pull/24098


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34174) Remove SlotMatchingStrategy related logic

2024-01-21 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809232#comment-17809232
 ] 

RocMarshal commented on FLINK-34174:


The main reason for initiating this ticket is 
https://issues.apache.org/jira/browse/FLINK-31449  
(IIUC) as the current related logic is no longer being used.

> Remove SlotMatchingStrategy related logic
> -
>
> Key: FLINK-34174
> URL: https://issues.apache.org/jira/browse/FLINK-34174
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Task
>Reporter: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34175) When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer

2024-01-21 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34175:
---
Description: 
The following test added to SlicingWindowAggOperatorTest can re-produce this 
problem.
{code:java}
private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF =
new RowType(
Arrays.asList(
new RowType.RowField("f0", new 
VarCharType(Integer.MAX_VALUE)),
new RowType.RowField("f1", new IntType()),
new RowType.RowField("f2", new TimestampType()),
new RowType.RowField("f3", new TimestampType()),
new RowType.RowField(
"f4", new TimestampType(false, 
TimestampKind.ROWTIME, 3;

protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF =
new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); 

@Test
public void test() throws Exception {
    final SliceAssigner innerAssigner =
            SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3));
    final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner);
    final SlicingSumAndCountAggsFunction aggsFunction =
            new SlicingSumAndCountAggsFunction(assigner);
    SlicingWindowOperator operator =
            (SlicingWindowOperator)
                    WindowAggOperatorBuilder.builder()
                            .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF)
                            .shiftTimeZone(shiftTimeZone)
                            .keySerializer(KEY_SER)
                            .assigner(assigner)
                            .aggregate(wrapGenerated(aggsFunction), ACC_SER)
                            .build();


    OneInputStreamOperatorTestHarness testHarness =
            createTestHarness(operator);


    testHarness.setup(OUT_SERIALIZER);
    testHarness.open();


    // process elements
    ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();


    // add elements out-of-order
    testHarness.processElement(
            insertRecord(
                    "key2",
                    1,
                    fromEpochMillis(999L),
                    fromEpochMillis(3999L),
                    fromEpochMillis(3998L)));


    testHarness.processWatermark(new Watermark(999));
    expectedOutput.add(new Watermark(999));
    ASSERTER.assertOutputEqualsSorted(
            "Output was not correct.", expectedOutput, testHarness.getOutput());


    testHarness.close();
}{code}

> When meeting WindowedSliceAssigner, slice window agg registers an wrong 
> timestamp timer 
> 
>
> Key: FLINK-34175
> URL: https://issues.apache.org/jira/browse/FLINK-34175
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: xuyang
>Priority: Major
>
> The following test added to SlicingWindowAggOperatorTest can re-produce this 
> problem.
> {code:java}
> private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF =
> new RowType(
> Arrays.asList(
> new RowType.RowField("f0", new 
> VarCharType(Integer.MAX_VALUE)),
> new RowType.RowField("f1", new IntType()),
> new RowType.RowField("f2", new TimestampType()),
> new RowType.RowField("f3", new TimestampType()),
> new RowType.RowField(
> "f4", new TimestampType(false, 
> TimestampKind.ROWTIME, 3;
> protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF =
> new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); 
> @Test
> public void test() throws Exception {
>     final SliceAssigner innerAssigner =
>             SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3));
>     final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner);
>     final SlicingSumAndCountAggsFunction aggsFunction =
>             new SlicingSumAndCountAggsFunction(assigner);
>     SlicingWindowOperator operator =
>             (SlicingWindowOperator)
>                     WindowAggOperatorBuilder.builder()
>                             .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF)
>                             .shiftTimeZone(shiftTimeZone)
>                             .keySerializer(KEY_SER)
>                             .assigner(assigner)
>                             .aggregate(wrapGenerated(aggsFunction), ACC_SER)
>                             .build();
>     OneInputStreamOperatorTestHarness testHarness =
>             createTestHarness(operator);
>     testHarness.setup(OUT_SERIALIZER);
>     testHarness.open();
>     // process elements
>     ConcurrentLinkedQueue expectedOutput 

[jira] [Created] (FLINK-34175) When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer

2024-01-21 Thread xuyang (Jira)
xuyang created FLINK-34175:
--

 Summary: When meeting WindowedSliceAssigner, slice window agg 
registers an wrong timestamp timer 
 Key: FLINK-34175
 URL: https://issues.apache.org/jira/browse/FLINK-34175
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33450) Implement JdbcAutoScalerStateStore

2024-01-21 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809223#comment-17809223
 ] 

Rui Fan commented on FLINK-33450:
-

Merged to main(1.8.0) via :
 * c34fe4f41a2f684da01d84ab0d761ec669af83f6
 * 293067ab3fa227eb6a38ef5d1255fa4641da142c
 * 96b2aa0b6684d0b06602c128a45f8886adf9324a
 * 6e78221e6abc72ce8daeb92be1cf58fd2506b591

> Implement JdbcAutoScalerStateStore
> --
>
> Key: FLINK-33450
> URL: https://issues.apache.org/jira/browse/FLINK-33450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Design doc: 
> https://docs.google.com/document/d/1lE4s3ZAyCfzYT4dNOVarz5dIctQ8UUAVqjPKlQ1XU1c/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33450) Implement JdbcAutoScalerStateStore

2024-01-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33450.
-
Resolution: Fixed

> Implement JdbcAutoScalerStateStore
> --
>
> Key: FLINK-33450
> URL: https://issues.apache.org/jira/browse/FLINK-33450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Design doc: 
> https://docs.google.com/document/d/1lE4s3ZAyCfzYT4dNOVarz5dIctQ8UUAVqjPKlQ1XU1c/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34174][runtime] Remove SlotMatchingStrategy related logic [flink]

2024-01-21 Thread via GitHub


RocMarshal commented on PR #24158:
URL: https://github.com/apache/flink/pull/24158#issuecomment-1903021276

   Hi, @1996fanrui Thanks for your review.
   The main reason for initiating this pr is 
   https://issues.apache.org/jira/browse/FLINK-31449  
   (IIUC) as the current classes are no longer being used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]

2024-01-21 Thread via GitHub


MOBIN-F closed pull request #36: [FLINK-33971]Specifies whether to use HBase 
table that supports dynamic columns.
URL: https://github.com/apache/flink-connector-hbase/pull/36


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-21 Thread via GitHub


1996fanrui merged PR #741:
URL: https://github.com/apache/flink-kubernetes-operator/pull/741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] E2E for minibatch join [flink]

2024-01-21 Thread via GitHub


flinkbot commented on PR #24161:
URL: https://github.com/apache/flink/pull/24161#issuecomment-1903008823

   
   ## CI report:
   
   * cc0d27ae8047710e53107d9fd9b50b8578c9ff25 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Introduce operator for minibatch join [flink]

2024-01-21 Thread via GitHub


flinkbot commented on PR #24160:
URL: https://github.com/apache/flink/pull/24160#issuecomment-1903006337

   
   ## CI report:
   
   * d487b086c0cccb48244e46b106bcd26143e6a272 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft][bugfix] Move the deserialization of shuffleDescriptor to a separate … [flink]

2024-01-21 Thread via GitHub


caodizhou commented on PR #24115:
URL: https://github.com/apache/flink/pull/24115#issuecomment-1903002055

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


luoyuxia commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461300877


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##
@@ -129,40 +129,55 @@ public void processElement(
 
 // do lookup for acc msg
 if (RowDataUtil.isAccumulateMsg(in)) {
-// clear local state first
-deleteState();
+if (lookupJoinRunner.preFilter(in)) {
+// clear local state first
+deleteState();
 
-// fetcher has copied the input field when object reuse is enabled
-lookupJoinRunner.doFetch(in);
+// fetcher has copied the input field when object reuse is 
enabled
+lookupJoinRunner.doFetch(in);
 
-// update state with empty row if lookup miss or pre-filtered
-if (!collectListener.collected) {
-updateState(emptyRow);
+// update state with empty row if lookup miss or pre-filtered
+if (!collectListener.collected) {
+updateState(emptyRow);
+}
 }
-
 lookupJoinRunner.padNullForLeftJoin(in, out);
 } else {
-// do state access for non-acc msg
-if (lookupKeyContainsPrimaryKey) {
-RowData rightRow = uniqueState.value();
-// should distinguish null from empty(lookup miss)
-if (null == rightRow) {
-stateStaledErrorHandle(in, out);
-} else {
-collectDeleteRow(in, rightRow, out);
-}
-} else {
-List rightRows = state.value();
-if (null == rightRows) {
-stateStaledErrorHandle(in, out);
+boolean collected = false;
+if (lookupJoinRunner.preFilter(in)) {

Review Comment:
   Got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] introduce BufferBundle for minibatch join [flink]

2024-01-21 Thread via GitHub


flinkbot commented on PR #24159:
URL: https://github.com/apache/flink/pull/24159#issuecomment-1902987990

   
   ## CI report:
   
   * 719a6c42f3b0f7adfb7047f9ac2a29d6a62b3afb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] E2E for minibatch join [flink]

2024-01-21 Thread via GitHub


xishuaidelin opened a new pull request, #24161:
URL: https://github.com/apache/flink/pull/24161

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Introduce operator for minibatch join [flink]

2024-01-21 Thread via GitHub


xishuaidelin opened a new pull request, #24160:
URL: https://github.com/apache/flink/pull/24160

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] introduce BufferBundle for minibatch join [flink]

2024-01-21 Thread via GitHub


xishuaidelin opened a new pull request, #24159:
URL: https://github.com/apache/flink/pull/24159

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


lincoln-lil commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461293471


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##
@@ -129,40 +129,55 @@ public void processElement(
 
 // do lookup for acc msg
 if (RowDataUtil.isAccumulateMsg(in)) {
-// clear local state first
-deleteState();
+if (lookupJoinRunner.preFilter(in)) {
+// clear local state first
+deleteState();
 
-// fetcher has copied the input field when object reuse is enabled
-lookupJoinRunner.doFetch(in);
+// fetcher has copied the input field when object reuse is 
enabled
+lookupJoinRunner.doFetch(in);
 
-// update state with empty row if lookup miss or pre-filtered
-if (!collectListener.collected) {
-updateState(emptyRow);
+// update state with empty row if lookup miss or pre-filtered
+if (!collectListener.collected) {
+updateState(emptyRow);
+}
 }
-
 lookupJoinRunner.padNullForLeftJoin(in, out);
 } else {
-// do state access for non-acc msg
-if (lookupKeyContainsPrimaryKey) {
-RowData rightRow = uniqueState.value();
-// should distinguish null from empty(lookup miss)
-if (null == rightRow) {
-stateStaledErrorHandle(in, out);
-} else {
-collectDeleteRow(in, rightRow, out);
-}
-} else {
-List rightRows = state.value();
-if (null == rightRows) {
-stateStaledErrorHandle(in, out);
+boolean collected = false;
+if (lookupJoinRunner.preFilter(in)) {

Review Comment:
   The preFilter here only related to left local filter conditions which can 
short-cut the lookup path(introduced by FLINK-18445), so the filter on right 
tale `b.name is not null` will not affect the the preFilter logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-34049) Refactor classes related to window TVF aggregation to prepare for non-aligned windows

2024-01-21 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-34049.
-
Resolution: Implemented

> Refactor classes related to window TVF aggregation to prepare for non-aligned 
> windows
> -
>
> Key: FLINK-34049
> URL: https://issues.apache.org/jira/browse/FLINK-34049
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> Refactor classes related to window TVF aggregation such as 
> AbstractWindowAggProcessor to prepare for the implementation of non-aligned 
> windows like session window



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]

2024-01-21 Thread via GitHub


1996fanrui commented on PR #741:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/741#issuecomment-1902917759

   Thanks for the patient review, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34049) Refactor classes related to window TVF aggregation to prepare for non-aligned windows

2024-01-21 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809213#comment-17809213
 ] 

Shengkai Fang commented on FLINK-34049:
---

Merged into master: e345ffb453ac482d0250736b687cf88b85c606b0

> Refactor classes related to window TVF aggregation to prepare for non-aligned 
> windows
> -
>
> Key: FLINK-34049
> URL: https://issues.apache.org/jira/browse/FLINK-34049
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> Refactor classes related to window TVF aggregation such as 
> AbstractWindowAggProcessor to prepare for the implementation of non-aligned 
> windows like session window



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]

2024-01-21 Thread via GitHub


fsk119 merged PR #24068:
URL: https://github.com/apache/flink/pull/24068


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-21 Thread via GitHub


libenchao commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1461286671


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java:
##
@@ -70,6 +98,51 @@ public void testFilterPushdown() {
 "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
 }
 
+/**
+ * Note the join condition is not present in the optimized plan, as it is 
handled in the JDBC

Review Comment:
   > just for my understanding ar we thinking that the rowdata sent in on the 
lookup would contain the pushdown predicates so the code could call and this 
would handle the predicates and the keys:
   >  statement = lookupKeyRowConverter.toExternal(keyRow, statement);
   > we could then remove the need for?
   > statement = setPredicateParams(statement); 
   
   What I was proposing is showing the predicates in the digest of lookup join 
node. That way, we can see it in the test xml files, and also it can be shown 
in Flink Web UI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34029][runtime-web] Support different profiling mode on Flink WEB [flink]

2024-01-21 Thread via GitHub


yuchen-ecnu commented on PR #24130:
URL: https://github.com/apache/flink/pull/24130#issuecomment-1902905798

   Hi @Myasuka , do you have time to help review this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]

2024-01-21 Thread via GitHub


luoyuxia commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461275950


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##
@@ -129,40 +129,55 @@ public void processElement(
 
 // do lookup for acc msg
 if (RowDataUtil.isAccumulateMsg(in)) {
-// clear local state first
-deleteState();
+if (lookupJoinRunner.preFilter(in)) {
+// clear local state first
+deleteState();
 
-// fetcher has copied the input field when object reuse is enabled
-lookupJoinRunner.doFetch(in);
+// fetcher has copied the input field when object reuse is 
enabled
+lookupJoinRunner.doFetch(in);
 
-// update state with empty row if lookup miss or pre-filtered
-if (!collectListener.collected) {
-updateState(emptyRow);
+// update state with empty row if lookup miss or pre-filtered
+if (!collectListener.collected) {
+updateState(emptyRow);
+}
 }
-
 lookupJoinRunner.padNullForLeftJoin(in, out);
 } else {
-// do state access for non-acc msg
-if (lookupKeyContainsPrimaryKey) {
-RowData rightRow = uniqueState.value();
-// should distinguish null from empty(lookup miss)
-if (null == rightRow) {
-stateStaledErrorHandle(in, out);
-} else {
-collectDeleteRow(in, rightRow, out);
-}
-} else {
-List rightRows = state.value();
-if (null == rightRows) {
-stateStaledErrorHandle(in, out);
+boolean collected = false;
+if (lookupJoinRunner.preFilter(in)) {

Review Comment:
   If the filter condition is on the right table,will 
`lookupJoinRunner.preFilter(in)` will be true?
   like 
   ```sql
   from a join b as FOR SYSTEM_TIME AS OF PROCTIME() 
   on a.id = b.id and b.name is not null
   ```
   It seems the filter ` b.name is not null` will be code gen into 
`generatedFetcher` instead of `preFilterCondition` in `LookupJoinRunner`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32417) DynamicKafkaSource User Documentation

2024-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32417:
---
Labels: pull-request-available  (was: )

> DynamicKafkaSource User Documentation
> -
>
> Key: FLINK-32417
> URL: https://issues.apache.org/jira/browse/FLINK-32417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> Add user documentation for DynamicKafkaSource



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32417] Add DynamicKafkaSource documentation for setter methods… [flink-connector-kafka]

2024-01-21 Thread via GitHub


mas-chen commented on PR #80:
URL: 
https://github.com/apache/flink-connector-kafka/pull/80#issuecomment-1902871885

   FYI: @mxm @tzulitai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-32417] Add DynamicKafkaSource documentation for setter methods… [flink-connector-kafka]

2024-01-21 Thread via GitHub


mas-chen opened a new pull request, #80:
URL: https://github.com/apache/flink-connector-kafka/pull/80

   …, metrics, and config options
   
   I verified locally that the documentation comes up properly and that all the 
links work.
   https://github.com/apache/flink-connector-kafka/assets/6834335/13bce63e-0b61-401a-b70e-906aac64e067;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33803] Set observedGeneration at end of reconciliation [flink-kubernetes-operator]

2024-01-21 Thread via GitHub


gyfora commented on code in PR #755:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/755#discussion_r1461096099


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##
@@ -126,6 +126,9 @@ private static  void 
updateStatusForSpecReconcil
 status.setError(null);
 
reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
 
+// Set observedGeneration
+status.setObservedGeneration(target.getMetadata().getGeneration());

Review Comment:
   I still don't think that this is consistent with the current content of the 
lastReconciledSpec metadata. It's not about setting it to the same value 
(source of truth) it's also about setting it at the exact same time.
   
   Also we should remove internal useges of the 
lastReconciledSpec.meta.generation and replace it with this one. We have tests 
for the behaviour so that would validate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34131) Checkpoint check window should take in account checkpoint job configuration

2024-01-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-34131:
--

Assignee: Nicolas Fraison

> Checkpoint check window should take in account checkpoint job configuration
> ---
>
> Key: FLINK-34131
> URL: https://issues.apache.org/jira/browse/FLINK-34131
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Nicolas Fraison
>Assignee: Nicolas Fraison
>Priority: Minor
>  Labels: pull-request-available
>
> When enabling checkpoint progress check 
> (kubernetes.operator.cluster.health-check.checkpoint-progress.enabled) to 
> define cluster health the operator rely detect if a checkpoint has been 
> performed during the 
> kubernetes.operator.cluster.health-check.checkpoint-progress.window
> As indicated in the doc it must be bigger to checkpointing interval.
> But this is a manual configuration which can leads to misconfiguration and 
> unwanted restart of the flink cluster if the checkpointing interval is bigger 
> than the window one.
> The operator must check that the config is healthy before to rely on this 
> check. If it is not well set it should not execute the check (return true on 
> [evaluateCheckpoints|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java#L197C1-L199C50])
>  and log a WARN message.
> Also flink jobs have other checkpointing parameters that should be taken in 
> account for this window configuration which are 
> execution.checkpointing.timeout and 
> execution.checkpointing.tolerable-failed-checkpoints
> The idea would be to check that 
> kubernetes.operator.cluster.health-check.checkpoint-progress.window >= 
> max(execution.checkpointing.interval * 
> execution.checkpointing.tolerable-failed-checkpoints, 
> execution.checkpointing.timeout * 
> execution.checkpointing.tolerable-failed-checkpoints)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34131) Checkpoint check window should take in account checkpoint job configuration

2024-01-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34131.
--
Resolution: Fixed

merged to main 775bc5f41df09accefca6590112509c6023b2fb4

> Checkpoint check window should take in account checkpoint job configuration
> ---
>
> Key: FLINK-34131
> URL: https://issues.apache.org/jira/browse/FLINK-34131
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Nicolas Fraison
>Priority: Minor
>  Labels: pull-request-available
>
> When enabling checkpoint progress check 
> (kubernetes.operator.cluster.health-check.checkpoint-progress.enabled) to 
> define cluster health the operator rely detect if a checkpoint has been 
> performed during the 
> kubernetes.operator.cluster.health-check.checkpoint-progress.window
> As indicated in the doc it must be bigger to checkpointing interval.
> But this is a manual configuration which can leads to misconfiguration and 
> unwanted restart of the flink cluster if the checkpointing interval is bigger 
> than the window one.
> The operator must check that the config is healthy before to rely on this 
> check. If it is not well set it should not execute the check (return true on 
> [evaluateCheckpoints|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java#L197C1-L199C50])
>  and log a WARN message.
> Also flink jobs have other checkpointing parameters that should be taken in 
> account for this window configuration which are 
> execution.checkpointing.timeout and 
> execution.checkpointing.tolerable-failed-checkpoints
> The idea would be to check that 
> kubernetes.operator.cluster.health-check.checkpoint-progress.window >= 
> max(execution.checkpointing.interval * 
> execution.checkpointing.tolerable-failed-checkpoints, 
> execution.checkpointing.timeout * 
> execution.checkpointing.tolerable-failed-checkpoints)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34131) Checkpoint check window should take in account checkpoint job configuration

2024-01-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-34131:
---
Fix Version/s: kubernetes-operator-1.8.0

> Checkpoint check window should take in account checkpoint job configuration
> ---
>
> Key: FLINK-34131
> URL: https://issues.apache.org/jira/browse/FLINK-34131
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Nicolas Fraison
>Assignee: Nicolas Fraison
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> When enabling checkpoint progress check 
> (kubernetes.operator.cluster.health-check.checkpoint-progress.enabled) to 
> define cluster health the operator rely detect if a checkpoint has been 
> performed during the 
> kubernetes.operator.cluster.health-check.checkpoint-progress.window
> As indicated in the doc it must be bigger to checkpointing interval.
> But this is a manual configuration which can leads to misconfiguration and 
> unwanted restart of the flink cluster if the checkpointing interval is bigger 
> than the window one.
> The operator must check that the config is healthy before to rely on this 
> check. If it is not well set it should not execute the check (return true on 
> [evaluateCheckpoints|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java#L197C1-L199C50])
>  and log a WARN message.
> Also flink jobs have other checkpointing parameters that should be taken in 
> account for this window configuration which are 
> execution.checkpointing.timeout and 
> execution.checkpointing.tolerable-failed-checkpoints
> The idea would be to check that 
> kubernetes.operator.cluster.health-check.checkpoint-progress.window >= 
> max(execution.checkpointing.interval * 
> execution.checkpointing.tolerable-failed-checkpoints, 
> execution.checkpointing.timeout * 
> execution.checkpointing.tolerable-failed-checkpoints)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34131] Ensure cluster.health-check.checkpoint-progress.window is well configure [flink-kubernetes-operator]

2024-01-21 Thread via GitHub


gyfora merged PR #758:
URL: https://github.com/apache/flink-kubernetes-operator/pull/758


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager

2024-01-21 Thread Anurag Kyal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809152#comment-17809152
 ] 

Anurag Kyal commented on FLINK-32212:
-

I am running into this issue as well after my job restarts (trying to figure 
out the reason for random restarts) but these errors show up. Most of the times 
it stops after 8-10 exceptions but occasionally it gets into an infinite loop.

I am using a pubsub connector to read notifications as the first step in my job 
and seems like these exceptions are related to this step - at least that is 
what the UI says.

Was anyone able to find other solutions rather than manual intervention which I 
want to avoid in the middle of night.

> Job restarting indefinitely after an IllegalStateException from 
> BlobLibraryCacheManager
> ---
>
> Key: FLINK-32212
> URL: https://issues.apache.org/jira/browse/FLINK-32212
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.16.1
> Environment: Apache Flink Kubernetes Operator 1.4
>Reporter: Matheus Felisberto
>Priority: Major
>
> After running for a few hours the job starts to throw IllegalStateException 
> and I can't figure out why. To restore the job, I need to manually delete the 
> FlinkDeployment to be recreated and redeploy everything.
> The jar is built-in into the docker image, hence is defined accordingly with 
> the Operator's documentation:
> {code:java}
> // jarURI: local:///opt/flink/usrlib/my-job.jar {code}
> I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work 
> either. 
>  
> {code:java}
> // Source: my-topic (1/2)#30587 
> (b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
> switched from DEPLOYING to FAILED with failure cause: 
> java.lang.IllegalStateException: The library registration references a 
> different set of library BLOBs than previous registrations for this job:
> old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
> new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
>     at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}
> If there is any other information that can help to identify the problem, 
> please let me know.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]

2024-01-21 Thread via GitHub


leonardBang commented on code in PR #23511:
URL: https://github.com/apache/flink/pull/23511#discussion_r1460846335


##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java:
##
@@ -121,6 +128,12 @@ private static AvroToRowDataConverter 
createConverter(LogicalType type) {
 return AvroToRowDataConverters::convertToTime;
 case TIMESTAMP_WITHOUT_TIME_ZONE:
 return AvroToRowDataConverters::convertToTimestamp;

Review Comment:
   IIUC, we should convert Flink SQL TIMESTAMP_NTZ type to AVRO Local Timestamp 
finally, this is the correct behavior



##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java:
##
@@ -71,5 +70,12 @@ public InlineElement getDescription() {
 }
 }
 
+@Deprecated
+public static final ConfigOption AVRO_TIMESTAMP_LEGACY_MAPPING =

Review Comment:
   Introducing a  **new** config option with `Deprecated` is a little confused 
to users, I think we can mark it as `Deprecated` when we decide to to deprecate 
it.



##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java:
##
@@ -86,6 +86,27 @@ public AvroRowDataDeserializationSchema(
 typeInfo);
 }
 
+/**
+ * Creates an Avro deserialization schema for the given logical type.
+ *
+ * @param rowType The logical type used to deserialize the data.
+ * @param typeInfo The TypeInformation to be used by {@link
+ * AvroRowDataDeserializationSchema#getProducedType()}.
+ * @param encoding The serialization approach used to deserialize the data.
+ * @param legacyMapping Whether to use legacy mapping.

Review Comment:
the scope meaning of `legacyMapping` is too large for our case, what we 
want to express is only timestamp  types' legacy behavior, but this name looks 
like we have a legacy behavior for all types. How about 
`legacyTimestampMapping`?



##
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java:
##
@@ -352,17 +457,55 @@ public static Schema convertToSchema(LogicalType 
logicalType, String rowName) {
 final TimestampType timestampType = (TimestampType) 
logicalType;
 precision = timestampType.getPrecision();
 org.apache.avro.LogicalType avroLogicalType;
-if (precision <= 3) {
-avroLogicalType = LogicalTypes.timestampMillis();
+if (legacyMapping) {
+if (precision <= 3) {
+avroLogicalType = LogicalTypes.timestampMillis();
+} else if (precision <= 6) {
+avroLogicalType = LogicalTypes.timestampMicros();
+} else {

Review Comment:
   I'm happy that you can  add precision support here, but I suggest we can 
open a new PR or commit to finish this work, it will be more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org