[jira] [Created] (FLINK-34191) Remove RestoreMode#LEGACY
Zakelly Lan created FLINK-34191: --- Summary: Remove RestoreMode#LEGACY Key: FLINK-34191 URL: https://issues.apache.org/jira/browse/FLINK-34191 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34190) Deprecate RestoreMode#LEGACY
Zakelly Lan created FLINK-34190: --- Summary: Deprecate RestoreMode#LEGACY Key: FLINK-34190 URL: https://issues.apache.org/jira/browse/FLINK-34190 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34189) FLIP-416: Deprecate and remove the RestoreMode#LEGACY
Zakelly Lan created FLINK-34189: --- Summary: FLIP-416: Deprecate and remove the RestoreMode#LEGACY Key: FLINK-34189 URL: https://issues.apache.org/jira/browse/FLINK-34189 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 2.0.0, 1.19.0 [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=287607202] The [FLIP-193|https://cwiki.apache.org/confluence/x/bIyqCw] introduced two modes of state file ownership during checkpoint restoration: RestoreMode#CLAIM and RestoreMode#NO_CLAIM. The LEGACY mode, which was how Flink worked until 1.15, has been superseded by NO_CLAIM as the default mode. The main drawback of LEGACY mode is that the new job relies on artifacts from the old job without cleaning them up, leaving users uncertain about when it is safe to delete the old checkpoint directories. This leads to the accumulation of unnecessary checkpoint files that are never cleaned up. Considering cluster availability and job maintenance, it is not recommended to use LEGACY mode. Users could choose the other two modes to get a clear semantic for the state file ownership. This FLIP proposes to deprecate the LEGACY mode and remove it completely in the upcoming Flink 2.0. This will make the semantic clear as well as eliminate many bugs caused by mode transitions involving LEGACY mode (e.g. !https://issues.apache.org/jira/secure/viewavatar?size=xsmall=21133=issuetype! FLINK-27114 - On JM restart, the information about the initial checkpoints can be lost OPEN ) and enhance code maintainability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34177) Not able to create FlinkSessionJob in different namespace than flink deployment
[ https://issues.apache.org/jira/browse/FLINK-34177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-34177. -- Resolution: Not A Problem Closing this for now as the behaviour is intended. If you would like to change the behaviour, please consider creating a FLIP / mailing list discussion about it. > Not able to create FlinkSessionJob in different namespace than flink > deployment > --- > > Key: FLINK-34177 > URL: https://issues.apache.org/jira/browse/FLINK-34177 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 > Environment: *AWS EKS K8 version 1.25* > {*}Kubernetes Flink Operator 1.7.0{*}{*}{*} > *Flink 1.17/1.18* >Reporter: Pramod >Priority: Major > > Here is my use case: > 1) I created a namespace "flink" in aws cluster with k8 version 1.25 > 2) Deployed flink kubernetes operator in "flink" namespace using helm chart > 3) deployed FlinkDeployment similar to > [flink-kubernetes-operator/examples/basic-session-deployment-only.yaml at > main · apache/flink-kubernetes-operator · > GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml] > in flink namespace > 4) Then deployed FlinkSession job similar to > [flink-kubernetes-operator/examples/basic-session-job-only.yaml at main · > apache/flink-kubernetes-operator · > GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-job-only.yaml] > but in different namespace "tss" > 5) Already added both the namespaces to watchednamespaces > watchNamespaces: ["tss", "flink"] > > Expected: > FlinkSessionJob will start in tss namespace > > Actual: > Job is not starting and throwing the error "*Flink version null is not > supported by this operator version*" > > > > > My suspect is it seems FlinkDeployment and FlinkSessionJob should be in the > namespace. However i am not sure. So i am raising this bug. > > {color:#FF}*Can someone confirm if Flink kubernetes operator supports > Flink cluster(FlinkDeployment) to be in one namespace and then > FlinkSessionJob in another namespace.??*{color} > Supporting multiple namespaces make my life a lot easier. > > Note: Deploying FlinkSessionJob in the same namespace "Flink" works fine. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster
[ https://issues.apache.org/jira/browse/FLINK-34179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809278#comment-17809278 ] Gyula Fora commented on FLINK-34179: I am not aware of the bitnami helm chart, and I definitely don't think that it is supported by the community. Did you try the Flink Kubernetes operator? > More than one taskmanager is not coming up in flink (session mode) in aws eks > cluster > - > > Key: FLINK-34179 > URL: https://issues.apache.org/jira/browse/FLINK-34179 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.18.0 >Reporter: Nikhil_D >Priority: Blocker > Labels: flink-k8s > Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, > flink-values.yaml > > > Deployed flink in aws eks cluster using bitnami helm chart (bitnami/flink > 1.18). After deployment, noticing that out of 5 taskmanager pods only one > taskmanager pod is able to connect to resourcemanager which is present in > jobmanager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34177) Not able to create FlinkSessionJob in different namespace than flink deployment
[ https://issues.apache.org/jira/browse/FLINK-34177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809280#comment-17809280 ] Gyula Fora commented on FLINK-34177: Flink session jobs need to be created in the same namespace as the corresponding FlinkDeployment. This is an intentional limitation at the moment. > Not able to create FlinkSessionJob in different namespace than flink > deployment > --- > > Key: FLINK-34177 > URL: https://issues.apache.org/jira/browse/FLINK-34177 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 > Environment: *AWS EKS K8 version 1.25* > {*}Kubernetes Flink Operator 1.7.0{*}{*}{*} > *Flink 1.17/1.18* >Reporter: Pramod >Priority: Major > > Here is my use case: > 1) I created a namespace "flink" in aws cluster with k8 version 1.25 > 2) Deployed flink kubernetes operator in "flink" namespace using helm chart > 3) deployed FlinkDeployment similar to > [flink-kubernetes-operator/examples/basic-session-deployment-only.yaml at > main · apache/flink-kubernetes-operator · > GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml] > in flink namespace > 4) Then deployed FlinkSession job similar to > [flink-kubernetes-operator/examples/basic-session-job-only.yaml at main · > apache/flink-kubernetes-operator · > GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-job-only.yaml] > but in different namespace "tss" > 5) Already added both the namespaces to watchednamespaces > watchNamespaces: ["tss", "flink"] > > Expected: > FlinkSessionJob will start in tss namespace > > Actual: > Job is not starting and throwing the error "*Flink version null is not > supported by this operator version*" > > > > > My suspect is it seems FlinkDeployment and FlinkSessionJob should be in the > namespace. However i am not sure. So i am raising this bug. > > {color:#FF}*Can someone confirm if Flink kubernetes operator supports > Flink cluster(FlinkDeployment) to be in one namespace and then > FlinkSessionJob in another namespace.??*{color} > Supporting multiple namespaces make my life a lot easier. > > Note: Deploying FlinkSessionJob in the same namespace "Flink" works fine. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34180) Accept Flink CDC project as part of Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-34180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-34180: -- Assignee: Leonard Xu > Accept Flink CDC project as part of Apache Flink > > > Key: FLINK-34180 > URL: https://issues.apache.org/jira/browse/FLINK-34180 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: Leonard Xu >Assignee: Leonard Xu >Priority: Major > > As discussed in Flink dev mailing list[1][2], we have accepted the Flink > CDC project contribution, we should finished the repo and doc migration as > soon as possible. > [1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w > [2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster
[ https://issues.apache.org/jira/browse/FLINK-34179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-34179: --- Priority: Minor (was: Blocker) > More than one taskmanager is not coming up in flink (session mode) in aws eks > cluster > - > > Key: FLINK-34179 > URL: https://issues.apache.org/jira/browse/FLINK-34179 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.18.0 >Reporter: Nikhil_D >Priority: Minor > Labels: flink-k8s > Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, > flink-values.yaml > > > Deployed flink in aws eks cluster using bitnami helm chart (bitnami/flink > 1.18). After deployment, noticing that out of 5 taskmanager pods only one > taskmanager pod is able to connect to resourcemanager which is present in > jobmanager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34188) Setup release infrastructure for Flink CDC project
Leonard Xu created FLINK-34188: -- Summary: Setup release infrastructure for Flink CDC project Key: FLINK-34188 URL: https://issues.apache.org/jira/browse/FLINK-34188 Project: Flink Issue Type: Sub-task Components: Flink CDC Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34181) Migrate repo from ververica to apche
[ https://issues.apache.org/jira/browse/FLINK-34181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34181: --- Component/s: Flink CDC > Migrate repo from ververica to apche > > > Key: FLINK-34181 > URL: https://issues.apache.org/jira/browse/FLINK-34181 > Project: Flink > Issue Type: Sub-task > Components: Flink CDC >Reporter: Leonard Xu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34182) Migrate doc website from ververica to flink
[ https://issues.apache.org/jira/browse/FLINK-34182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-34182: --- Component/s: Flink CDC > Migrate doc website from ververica to flink > > > Key: FLINK-34182 > URL: https://issues.apache.org/jira/browse/FLINK-34182 > Project: Flink > Issue Type: Sub-task > Components: Flink CDC >Reporter: Leonard Xu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34187) Setup CI for Flink CDC project
Leonard Xu created FLINK-34187: -- Summary: Setup CI for Flink CDC project Key: FLINK-34187 URL: https://issues.apache.org/jira/browse/FLINK-34187 Project: Flink Issue Type: Sub-task Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34186) Migrate issue management from Github to JIRA
Leonard Xu created FLINK-34186: -- Summary: Migrate issue management from Github to JIRA Key: FLINK-34186 URL: https://issues.apache.org/jira/browse/FLINK-34186 Project: Flink Issue Type: Sub-task Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34185) Remove unwanted bundle dependencies
Leonard Xu created FLINK-34185: -- Summary: Remove unwanted bundle dependencies Key: FLINK-34185 URL: https://issues.apache.org/jira/browse/FLINK-34185 Project: Flink Issue Type: Sub-task Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34184) Update copyright and license file
Leonard Xu created FLINK-34184: -- Summary: Update copyright and license file Key: FLINK-34184 URL: https://issues.apache.org/jira/browse/FLINK-34184 Project: Flink Issue Type: Sub-task Components: Flink CDC Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34182) Migrate doc website from ververica to flink
Leonard Xu created FLINK-34182: -- Summary: Migrate doc website from ververica to flink Key: FLINK-34182 URL: https://issues.apache.org/jira/browse/FLINK-34182 Project: Flink Issue Type: Sub-task Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34183) Add NOTICE files for Flink CDC project
Leonard Xu created FLINK-34183: -- Summary: Add NOTICE files for Flink CDC project Key: FLINK-34183 URL: https://issues.apache.org/jira/browse/FLINK-34183 Project: Flink Issue Type: Sub-task Components: Flink CDC Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34181) Migrate repo from ververica to apche
Leonard Xu created FLINK-34181: -- Summary: Migrate repo from ververica to apche Key: FLINK-34181 URL: https://issues.apache.org/jira/browse/FLINK-34181 Project: Flink Issue Type: Sub-task Reporter: Leonard Xu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34180) Accept Flink CDC project as part of Apache Flink
Leonard Xu created FLINK-34180: -- Summary: Accept Flink CDC project as part of Apache Flink Key: FLINK-34180 URL: https://issues.apache.org/jira/browse/FLINK-34180 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: Leonard Xu As discussed in Flink dev mailing list[1][2], we have accepted the Flink CDC project contribution, we should finished the repo and doc migration as soon as possible. [1] https://lists.apache.org/thread/sq5w21tcomrmb025tl820cxty9l0z26w [2] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is always great than `stabilization.interval` [flink-kubernetes-operator]
1996fanrui commented on code in PR #759: URL: https://github.com/apache/flink-kubernetes-operator/pull/759#discussion_r1461443852 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -160,14 +160,23 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore); var jobTopology = collectedMetrics.getJobTopology(); +var now = clock.instant(); Review Comment: Hi @afedulov , We are using `now` as the `endTime` of ScalingRecord. The `kubernetes.operator.reconcile.interval` is 1 min by default and `scaling` is called in each reconcile. So, the restartTime will be greater than `1 min` even if the scaling is very quick. I'm not sure should we get the endTime from the `JobUpdateTs`? Currently, `stabilization.interval` checks the `JobUpdateTs`. Looking forward to your feedback, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
leonardBang commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r146161 ## flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java: ## @@ -330,23 +338,87 @@ void testSerializationWithTypesMismatch(AvroEncoding encoding) throws Exception .hasStackTraceContaining("Fail to serialize at field: f1"); } +@Test +void testTimestampTypeLegacyMapping() throws Exception { +final Tuple4, SpecificRecord, GenericRecord, Row> testData = +AvroTestUtils.getTimestampTestData(); + +SpecificDatumWriter datumWriter = new SpecificDatumWriter<>(Timestamps.class); +ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); +Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); +datumWriter.write((Timestamps) testData.f1, encoder); +encoder.flush(); + +DataType dataType = +AvroSchemaConverter.convertToDataType( + SpecificData.get().getSchema(Timestamps.class).toString()); + +// Timestamp with local timezone is converted to BigIntType +assertThat(dataType.getChildren().get(2)) +.isEqualTo(new AtomicDataType(new BigIntType(false))); +assertThat(dataType.getChildren().get(3)) +.isEqualTo(new AtomicDataType(new BigIntType(false))); + +assertThatThrownBy(() -> createSerializationSchema(dataType, AvroEncoding.BINARY, true)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage( +"Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + +assertThatThrownBy(() -> createDeserializationSchema(dataType, AvroEncoding.BINARY, true)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage( +"Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); Review Comment: Plz ignore my stupid comment, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34178) The ScalingTracking of autoscaler is wrong
[ https://issues.apache.org/jira/browse/FLINK-34178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34178: --- Labels: pull-request-available (was: ) > The ScalingTracking of autoscaler is wrong > -- > > Key: FLINK-34178 > URL: https://issues.apache.org/jira/browse/FLINK-34178 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Affects Versions: kubernetes-operator-1.8.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > > The ScalingTracking of autoscaler is wrong, it's always greater than > AutoScalerOptions#STABILIZATION_INTERVAL. > h2. Reason: > When flink job isStabilizing, ScalingMetricCollector#updateMetrics will > return a empty metric history. In the JobAutoScalerImpl#runScalingLogic > method, if `collectedMetrics.getMetricHistory().isEmpty()` , we don't update > the ScalingTracking. > > The default value of AutoScalerOptions#STABILIZATION_INTERVAL is 5 min, so > the restartTime is always greater than 5 min. > However, it's quick when we use rescale api. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34178][autoscaler] Fix the bug that observed scaling restart time is always great than `stabilization.interval` [flink-kubernetes-operator]
1996fanrui opened a new pull request, #759: URL: https://github.com/apache/flink-kubernetes-operator/pull/759 ## What is the purpose of the change The ScalingTracking of autoscaler is wrong, it's always greater than AutoScalerOptions#STABILIZATION_INTERVAL. ### Reason When flink job isStabilizing, ScalingMetricCollector#updateMetrics will return a empty metric history. In the JobAutoScalerImpl#runScalingLogic method, if `collectedMetrics.getMetricHistory().isEmpty()` , we don't update the ScalingTracking. The default value of AutoScalerOptions#STABILIZATION_INTERVAL is 5 min, so the restartTime is always greater than 5 min. However, it's quick when we use rescale api. ## Brief change log Update the scaling tracking even if the job is stabilizing. ## Verifying this change Adding the test later. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461426662 ## flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java: ## @@ -330,23 +338,87 @@ void testSerializationWithTypesMismatch(AvroEncoding encoding) throws Exception .hasStackTraceContaining("Fail to serialize at field: f1"); } +@Test +void testTimestampTypeLegacyMapping() throws Exception { +final Tuple4, SpecificRecord, GenericRecord, Row> testData = +AvroTestUtils.getTimestampTestData(); + +SpecificDatumWriter datumWriter = new SpecificDatumWriter<>(Timestamps.class); +ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); +Encoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); +datumWriter.write((Timestamps) testData.f1, encoder); +encoder.flush(); + +DataType dataType = +AvroSchemaConverter.convertToDataType( + SpecificData.get().getSchema(Timestamps.class).toString()); + +// Timestamp with local timezone is converted to BigIntType +assertThat(dataType.getChildren().get(2)) +.isEqualTo(new AtomicDataType(new BigIntType(false))); +assertThat(dataType.getChildren().get(3)) +.isEqualTo(new AtomicDataType(new BigIntType(false))); + +assertThatThrownBy(() -> createSerializationSchema(dataType, AvroEncoding.BINARY, true)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage( +"Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + +assertThatThrownBy(() -> createDeserializationSchema(dataType, AvroEncoding.BINARY, true)) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage( +"Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); Review Comment: It is for testing createSerializationSchema and createDeserializationSchema. Two different scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
leonardBang commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461418192 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java: ## @@ -317,6 +408,21 @@ public static Schema convertToSchema(LogicalType schema) { * @return Avro's {@link Schema} matching this logical type. */ public static Schema convertToSchema(LogicalType logicalType, String rowName) { +return convertToSchema(logicalType, rowName, true); +} + +/** + * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. + * + * The "{rowName}_" is used as the nested row type name prefix in order to generate the right + * schema. Nested record type that only differs with type name is still compatible. + * + * @param logicalType logical type + * @param rowName the record name Review Comment: also add annotation for param `legacyTimestampMapping` ? ## flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java: ## @@ -87,16 +104,83 @@ void testSeDeSchema(AvroEncoding encoding) { assertThat(actualSer).isEqualTo(expectedSer); } +@Test +void testOldSeDeNewSchema() { +assertThatThrownBy( +() -> { +new AvroRowDataDeserializationSchema( +NEW_ROW_TYPE, InternalTypeInfo.of(NEW_ROW_TYPE)); +}) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage( +"Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); + +assertThatThrownBy( +() -> { +new AvroRowDataSerializationSchema(NEW_ROW_TYPE); +}) +.isInstanceOf(IllegalArgumentException.class) +.hasMessage( +"Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3."); +} + +@Test +void testNewSeDeNewSchema() { +testSeDeSchema(NEW_ROW_TYPE, NEW_SCHEMA, false); +} + +@ParameterizedTest +@ValueSource(booleans = {true, false}) +void testSeDeSchema(boolean legacyMapping) { +testSeDeSchema(ROW_TYPE, SCHEMA, legacyMapping); +} + +void testSeDeSchema(RowType rowType, ResolvedSchema schema, boolean legacyMapping) { +final AvroRowDataDeserializationSchema expectedDeser = +new AvroRowDataDeserializationSchema( +rowType, InternalTypeInfo.of(rowType), AvroEncoding.BINARY, legacyMapping); + +final Map options = getAllOptions(legacyMapping); + +final DynamicTableSource actualSource = FactoryMocks.createTableSource(schema, options); + assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class); +TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = +(TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + +DeserializationSchema actualDeser = +scanSourceMock.valueFormat.createRuntimeDecoder( +ScanRuntimeProviderContext.INSTANCE, schema.toPhysicalRowDataType()); + +assertThat(actualDeser).isEqualTo(expectedDeser); + +final AvroRowDataSerializationSchema expectedSer = +new AvroRowDataSerializationSchema(rowType, AvroEncoding.BINARY, legacyMapping); + +final DynamicTableSink actualSink = FactoryMocks.createTableSink(schema, options); + assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class); +TestDynamicTableFactory.DynamicTableSinkMock sinkMock = +(TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + +SerializationSchema actualSer = +sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType()); + +assertThat(actualSer).isEqualTo(expectedSer); +} + // // Utilities // -private Map getAllOptions() { +private Map getAllOptions(boolean legacyMapping) { Review Comment: legacyTimestampMapping ? ## flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java: ## @@ -330,23 +338,87 @@ void testSerializationWithTypesMismatch(AvroEncoding encoding) throws Exception .hasStackTraceContaining("Fail to serialize at field: f1"); } +@Test +void testTimestampTypeLegacyMapping() throws Exception { +final Tuple4, SpecificRecord, GenericRecord, Row> testData = +
[jira] [Updated] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster
[ https://issues.apache.org/jira/browse/FLINK-34179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikhil_D updated FLINK-34179: - Description: Deployed flink in aws eks cluster using bitnami helm chart (bitnami/flink 1.18). After deployment, noticing that out of 5 taskmanager pods only one taskmanager pod is able to connect to resourcemanager which is present in jobmanager. (was: Deployed flink in aws eks cluster using bitnami helm chart. After deployment, noticing that out of 5 taskmanager pods only one taskmanager pod is able to connect to resourcemanager which is present in jobmanager.) > More than one taskmanager is not coming up in flink (session mode) in aws eks > cluster > - > > Key: FLINK-34179 > URL: https://issues.apache.org/jira/browse/FLINK-34179 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.18.0 >Reporter: Nikhil_D >Priority: Blocker > Labels: flink-k8s > Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, > flink-values.yaml > > > Deployed flink in aws eks cluster using bitnami helm chart (bitnami/flink > 1.18). After deployment, noticing that out of 5 taskmanager pods only one > taskmanager pod is able to connect to resourcemanager which is present in > jobmanager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34179) More than one taskmanager is not coming up in flink (session mode) in aws eks cluster
Nikhil_D created FLINK-34179: Summary: More than one taskmanager is not coming up in flink (session mode) in aws eks cluster Key: FLINK-34179 URL: https://issues.apache.org/jira/browse/FLINK-34179 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.18.0 Reporter: Nikhil_D Attachments: flink-uxxx-taskmanager-8b69df9d7-xxnbn.log, flink-values.yaml Deployed flink in aws eks cluster using bitnami helm chart. After deployment, noticing that out of 5 taskmanager pods only one taskmanager pod is able to connect to resourcemanager which is present in jobmanager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
leonardBang commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461404314 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java: ## @@ -71,5 +70,11 @@ public InlineElement getDescription() { } } +public static final ConfigOption AVRO_TIMESTAMP_LEGACY_MAPPING = +ConfigOptions.key("timestamp_mapping.legacy") +.booleanType() +.defaultValue(true) +.withDescription("Use the legacy mapping of timestamp in avro"); Review Comment: Could you add more description here just like ? `The legacy behavior of Flink wrongly mapped both SQL TIMESTAMP and TIMESTAMP_LTZ type to AVRO TIMESTAMP before Flink 1.19 version, the correct behavior is Flink SQL TIMESTAMP maps Avro LOCAL TIMESTAMP and Flink SQL TIMESTAMP_LTZ maps Avro TIMESTAMP, you can obtain the correct mapping by disable using this legacy mapping. Use legacy behavior by default for compatibility consideration. ` You can polish this base on my poor English. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461398939 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ## @@ -121,6 +128,12 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; Review Comment: In the FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-378%3A+Support+Avro+timestamp+with+local+timezone, we have proposed the new mapping. Does it cover both the change in AvroToRowDataConverters and RowDataToAvroConverters? additionally support Avro LocalTimestamp <-> Flink TIMESTAMP_WITHOUT_TIME_ZONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33006] add e2e for Flink operator HA [flink-kubernetes-operator]
HuangZhenQiu commented on PR #756: URL: https://github.com/apache/flink-kubernetes-operator/pull/756#issuecomment-1903350455 @gyfora Thanks for all of the suggestion. I have changed scripts accordingly. Please approve the ci build workflow for further verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461398939 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ## @@ -121,6 +128,12 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; Review Comment: In the FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-378%3A+Support+Avro+timestamp+with+local+timezone, we have proposed the new mapping. Does it cover both the change in AvroToRowDataConverters and RowDataToAvroConverters? Avro Timestamp <-> Flink TIMESTAMP_WITH_LOCAL_TIME_ZONE additionally support Avro LocalTimestamp <-> Flink TIMESTAMP_WITHOUT_TIME_ZONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461398939 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ## @@ -121,6 +128,12 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; Review Comment: In the FLIP, we have proposed the new mapping. Does it cover both the change in AvroToRowDataConverters and RowDataToAvroConverters? Avro Timestamp <-> Flink TIMESTAMP_WITH_LOCAL_TIME_ZONE additionally support Avro LocalTimestamp <-> Flink TIMESTAMP_WITHOUT_TIME_ZONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34166) KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty
[ https://issues.apache.org/jira/browse/FLINK-34166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809254#comment-17809254 ] lincoln lee commented on FLINK-34166: - fixed in master: 27e6ac836171c5c5539ceeb234a806be661cc30a > KeyedLookupJoinWrapper incorrectly process delete message for inner join when > previous lookup result is empty > - > > Key: FLINK-34166 > URL: https://issues.apache.org/jira/browse/FLINK-34166 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.2, 1.18.1 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0, 1.18.2 > > > KeyedLookupJoinWrapper(when 'table.optimizer.non-deterministic-update.strategy > ' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns) > incorrectly process delete message for inner join when previous lookup result > is empty > The intermediate delete result > {code} > expectedOutput.add(deleteRecord(3, "c", null, null)); > {code} > in current case > KeyedLookupJoinHarnessTest#testTemporalInnerJoinWithFilterLookupKeyContainsPk > is incorrect: > {code} > @Test > public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws > Exception { > OneInputStreamOperatorTestHarness testHarness = > createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, > true); > testHarness.open(); > testHarness.processElement(insertRecord(1, "a")); > testHarness.processElement(insertRecord(2, "b")); > testHarness.processElement(insertRecord(3, "c")); > testHarness.processElement(insertRecord(4, "d")); > testHarness.processElement(insertRecord(5, "e")); > testHarness.processElement(updateBeforeRecord(3, "c")); > testHarness.processElement(updateAfterRecord(3, "c2")); > testHarness.processElement(deleteRecord(3, "c2")); > testHarness.processElement(insertRecord(3, "c3")); > List expectedOutput = new ArrayList<>(); > expectedOutput.add(insertRecord(1, "a", 1, "Julian")); > expectedOutput.add(insertRecord(4, "d", 4, "Fabian")); > expectedOutput.add(deleteRecord(3, "c", null, null)); > expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2")); > expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2")); > expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3")); > assertor.assertOutputEquals("output wrong.", expectedOutput, > testHarness.getOutput()); > testHarness.close(); > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]
lincoln-lil merged PR #24152: URL: https://github.com/apache/flink/pull/24152 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34178) The ScalingTracking of autoscaler is wrong
Rui Fan created FLINK-34178: --- Summary: The ScalingTracking of autoscaler is wrong Key: FLINK-34178 URL: https://issues.apache.org/jira/browse/FLINK-34178 Project: Flink Issue Type: Bug Components: Autoscaler Affects Versions: kubernetes-operator-1.8.0 Reporter: Rui Fan Assignee: Rui Fan The ScalingTracking of autoscaler is wrong, it's always greater than AutoScalerOptions#STABILIZATION_INTERVAL. h2. Reason: When flink job isStabilizing, ScalingMetricCollector#updateMetrics will return a empty metric history. In the JobAutoScalerImpl#runScalingLogic method, if `collectedMetrics.getMetricHistory().isEmpty()` , we don't update the ScalingTracking. The default value of AutoScalerOptions#STABILIZATION_INTERVAL is 5 min, so the restartTime is always greater than 5 min. However, it's quick when we use rescale api. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
leonardBang commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461377732 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ## @@ -121,6 +128,12 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; Review Comment: This a behavior change that we didn't discuss in dev mail-list, I'm considering to need discuss in mail-list, maybe we should back to mail-list and start a quick discussion, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on PR #23511: URL: https://github.com/apache/flink/pull/23511#issuecomment-1903283601 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on PR #23511: URL: https://github.com/apache/flink/pull/23511#issuecomment-1903283177 @leonardBang Thanks for providing valuable comments. I have refactor the code to resolve your comments. Another following PR will be created to change precision. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging [flink]
1996fanrui commented on code in PR #24003: URL: https://github.com/apache/flink/pull/24003#discussion_r1461348956 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java: ## @@ -61,8 +61,9 @@ public long getBackoffTime() { } @Override -public void notifyFailure(Throwable cause) { +public boolean notifyFailure(Throwable cause) { currentRestartAttempt++; +return true; Review Comment: > I guess, you meant [FLIP-364](https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+exponential-delay+restart-strategy) here? Exactly, sorry for my typo. > What about deprecating that class as part of this PR and coming up with a follow-up Jira issue that replaces the strategy? They are internal class, IIUC, we can refactor it directly without the `@Deprecated` annotation. - Your suggestion is that we still keep the failure-rate and fixed-delay restart-strategies, but we can reuse the `ExponentialDelayRestartBackoffTimeStrategy` class, right? I'm thinking could we deprecate failure-rate and fixed-delay restart-strategies directly? Users can configure them directly. Keep failure-rate and fixed-delay restart-strategies and reusing `ExponentialDelayRestartBackoffTimeStrategy` class still has the semantic problem that I mentioned in the last comment [1]. [1] https://github.com/apache/flink/pull/24003#discussion_r1456807889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34176) Remove unnecessary failure-rate and fixed-delay restart-strategies
[ https://issues.apache.org/jira/browse/FLINK-34176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-34176: Summary: Remove unnecessary failure-rate and fixed-delay restart-strategies (was: Remove unnecessary implemetations of RestartBackoffTimeStrategy) > Remove unnecessary failure-rate and fixed-delay restart-strategies > --- > > Key: FLINK-34176 > URL: https://issues.apache.org/jira/browse/FLINK-34176 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > h2. Could we deprecate the failure-rate and fixed-delay restart-strategies > directly? > After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is > already very feature-rich. It can replace fix-delay and failure rate restart > strategies directly. > h2. How to replace fix-delay restart strategy? > * Set backoffMultiplier = 1 and jitterFactor = 0 > * resetBackoffThresholdMS = Interget.Max > * initialBackoffMS and maxBackoffMS are the backoffTimeMS of > FixedDelayRestartBackoffTimeStrategy > * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of > FixedDelayRestartBackoffTimeStrategy > h2. How to replace failure-rate restart strategy? > * Set backoffMultiplier = 1 and jitterFactor = 0 > * resetBackoffThresholdMS is the failuresIntervalMS of > FailureRateRestartBackoffTimeStrategy > * initialBackoffMS and maxBackoffMS are the backoffTimeMS of > FailureRateRestartBackoffTimeStrategy > * attemptsBeforeResetBackoff is the maxFailuresPerInterval of > FailureRateRestartBackoffTimeStrategy -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34176) Remove unnecessary implemetations of RestartBackoffTimeStrategy
[ https://issues.apache.org/jira/browse/FLINK-34176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-34176: Description: h2. Could we deprecate the failure-rate and fixed-delay restart-strategies directly? After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is already very feature-rich. It can replace fix-delay and failure rate restart strategies directly. h2. How to replace fix-delay restart strategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS = Interget.Max * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FixedDelayRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of FixedDelayRestartBackoffTimeStrategy h2. How to replace failure-rate restart strategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS is the failuresIntervalMS of FailureRateRestartBackoffTimeStrategy * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FailureRateRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxFailuresPerInterval of FailureRateRestartBackoffTimeStrategy was: h2. Could we deprecate the failure-rate and fixed-delay restart-strategies directly? After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is already very feature-rich. It can replace fix-delay and failure rate restart strategies directly. h2. How to replace FixedDelayRestartBackoffTimeStrategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS = Interget.Max * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FixedDelayRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of FixedDelayRestartBackoffTimeStrategy h2. How to replace FailureRateRestartBackoffTimeStrategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS is the failuresIntervalMS of FailureRateRestartBackoffTimeStrategy * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FailureRateRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxFailuresPerInterval of FailureRateRestartBackoffTimeStrategy > Remove unnecessary implemetations of RestartBackoffTimeStrategy > --- > > Key: FLINK-34176 > URL: https://issues.apache.org/jira/browse/FLINK-34176 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > h2. Could we deprecate the failure-rate and fixed-delay restart-strategies > directly? > After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is > already very feature-rich. It can replace fix-delay and failure rate restart > strategies directly. > h2. How to replace fix-delay restart strategy? > * Set backoffMultiplier = 1 and jitterFactor = 0 > * resetBackoffThresholdMS = Interget.Max > * initialBackoffMS and maxBackoffMS are the backoffTimeMS of > FixedDelayRestartBackoffTimeStrategy > * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of > FixedDelayRestartBackoffTimeStrategy > h2. How to replace failure-rate restart strategy? > * Set backoffMultiplier = 1 and jitterFactor = 0 > * resetBackoffThresholdMS is the failuresIntervalMS of > FailureRateRestartBackoffTimeStrategy > * initialBackoffMS and maxBackoffMS are the backoffTimeMS of > FailureRateRestartBackoffTimeStrategy > * attemptsBeforeResetBackoff is the maxFailuresPerInterval of > FailureRateRestartBackoffTimeStrategy -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1461340507 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ## @@ -121,6 +128,12 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; Review Comment: Correct. The incorrect part for Flink SQL TIMESTAMP_NTZ type is in RowDataToAvroConverters. I just add TIMESTAMP_WITH_LOCAL_TIME_ZONE handling in the new mapping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34177) Not able to create FlinkSessionJob in different namespace than flink deployment
Pramod created FLINK-34177: -- Summary: Not able to create FlinkSessionJob in different namespace than flink deployment Key: FLINK-34177 URL: https://issues.apache.org/jira/browse/FLINK-34177 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0 Environment: *AWS EKS K8 version 1.25* {*}Kubernetes Flink Operator 1.7.0{*}{*}{*} *Flink 1.17/1.18* Reporter: Pramod Here is my use case: 1) I created a namespace "flink" in aws cluster with k8 version 1.25 2) Deployed flink kubernetes operator in "flink" namespace using helm chart 3) deployed FlinkDeployment similar to [flink-kubernetes-operator/examples/basic-session-deployment-only.yaml at main · apache/flink-kubernetes-operator · GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml] in flink namespace 4) Then deployed FlinkSession job similar to [flink-kubernetes-operator/examples/basic-session-job-only.yaml at main · apache/flink-kubernetes-operator · GitHub|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-job-only.yaml] but in different namespace "tss" 5) Already added both the namespaces to watchednamespaces watchNamespaces: ["tss", "flink"] Expected: FlinkSessionJob will start in tss namespace Actual: Job is not starting and throwing the error "*Flink version null is not supported by this operator version*" My suspect is it seems FlinkDeployment and FlinkSessionJob should be in the namespace. However i am not sure. So i am raising this bug. {color:#FF}*Can someone confirm if Flink kubernetes operator supports Flink cluster(FlinkDeployment) to be in one namespace and then FlinkSessionJob in another namespace.??*{color} Supporting multiple namespaces make my life a lot easier. Note: Deploying FlinkSessionJob in the same namespace "Flink" works fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34176) Remove unnecessary implemetations of RestartBackoffTimeStrategy
Rui Fan created FLINK-34176: --- Summary: Remove unnecessary implemetations of RestartBackoffTimeStrategy Key: FLINK-34176 URL: https://issues.apache.org/jira/browse/FLINK-34176 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan h2. Could we deprecate the failure-rate and fixed-delay restart-strategies directly? After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is already very feature-rich. It can replace fix-delay and failure rate restart strategies directly. h2. How to replace FixedDelayRestartBackoffTimeStrategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS = Interget.Max * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FixedDelayRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of FixedDelayRestartBackoffTimeStrategy h2. How to replace FailureRateRestartBackoffTimeStrategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS is the failuresIntervalMS of FailureRateRestartBackoffTimeStrategy * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FailureRateRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxFailuresPerInterval of FailureRateRestartBackoffTimeStrategy -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
flinkbot commented on PR #24162: URL: https://github.com/apache/flink/pull/24162#issuecomment-1903131997 ## CI report: * 5b90be3d7e080cfaea62a1711406a21319b415a2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34100) Support session window table function without pulling up with window agg
[ https://issues.apache.org/jira/browse/FLINK-34100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34100: --- Labels: pull-request-available (was: ) > Support session window table function without pulling up with window agg > - > > Key: FLINK-34100 > URL: https://issues.apache.org/jira/browse/FLINK-34100 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: xuyang >Priority: Major > Labels: pull-request-available > > This subtask resolves the session support in ExecWindowTableFunction. And > then the session window can support window agg with > WindowAttachedWindowingStrategy. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong opened a new pull request, #24162: URL: https://github.com/apache/flink/pull/24162 ## What is the purpose of the change Introduce windowed unslice window assigner. And support session window table function without pulling up with window agg. Note: this is a draft pr, and only the commits which messages start with '[FLINK-34100]' is new commits related to this pr. ## Brief change log - *add function getDescription for internal interface WindowAssigner* - *extract a WindowTableFunctionOperatorBase from WindowTableFunctionOperator to prepare for introducing unaligned window table function* - *introduce UnalignedWindowTableFunctionOperator for unaligned window* - *fix missing exchange before window table function node* - *support session window table function without pulling up with window agg* ## Verifying this change Some harness tests and ITCases are added. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? A later single pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33221) Add config options for administrator JVM options
[ https://issues.apache.org/jira/browse/FLINK-33221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-33221. - Resolution: Fixed > Add config options for administrator JVM options > > > Key: FLINK-33221 > URL: https://issues.apache.org/jira/browse/FLINK-33221 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > We encounter similar issues described in SPARK-23472. Users may need to add > JVM options to their Flink applications (e.g. to tune GC options). They > typically use {{env.java.opts.x}} series of options to do so. We also have a > set of administrator JVM options to apply by default, e.g. to enable GC log, > tune GC options, etc. Both use cases will need to set the same series of > options and will clobber one another. > In the past, we generated and pretended to the administrator JVM options in > the Java code for generating the starting command for JM/TM. However, this > has been proven to be difficult to maintain. > Therefore, I propose to also add a set of default JVM options for > administrator use that prepends the user-set extra JVM options. We can mark > the existing {{env.java.opts.x}} series as user-set extra JVM options and add > a set of new {{env.java.opts.x.default}} options for administrator use. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33221) Add config options for administrator JVM options
[ https://issues.apache.org/jira/browse/FLINK-33221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809233#comment-17809233 ] Rui Fan commented on FLINK-33221: - Merged to master(1.19.0) via : b4eb8ac503f41fd793db1ac662fbedc46af92fd5 > Add config options for administrator JVM options > > > Key: FLINK-33221 > URL: https://issues.apache.org/jira/browse/FLINK-33221 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > We encounter similar issues described in SPARK-23472. Users may need to add > JVM options to their Flink applications (e.g. to tune GC options). They > typically use {{env.java.opts.x}} series of options to do so. We also have a > set of administrator JVM options to apply by default, e.g. to enable GC log, > tune GC options, etc. Both use cases will need to set the same series of > options and will clobber one another. > In the past, we generated and pretended to the administrator JVM options in > the Java code for generating the starting command for JM/TM. However, this > has been proven to be difficult to maintain. > Therefore, I propose to also add a set of default JVM options for > administrator use that prepends the user-set extra JVM options. We can mark > the existing {{env.java.opts.x}} series as user-set extra JVM options and add > a set of new {{env.java.opts.x.default}} options for administrator use. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32732][Connectors/Kafka] auto offset reset should be exposed t… [flink-connector-kafka]
zhougit86 commented on PR #43: URL: https://github.com/apache/flink-connector-kafka/pull/43#issuecomment-1903098823 > @zhougit86 Can you please rebase your PR? @MartijnVisser Would you please tell me I should rebase this onto which branch? And would you please let me know you thinking of this MR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33221][core][config] Add config options for administrator JVM options [flink]
1996fanrui merged PR #24098: URL: https://github.com/apache/flink/pull/24098 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34174) Remove SlotMatchingStrategy related logic
[ https://issues.apache.org/jira/browse/FLINK-34174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809232#comment-17809232 ] RocMarshal commented on FLINK-34174: The main reason for initiating this ticket is https://issues.apache.org/jira/browse/FLINK-31449 (IIUC) as the current related logic is no longer being used. > Remove SlotMatchingStrategy related logic > - > > Key: FLINK-34174 > URL: https://issues.apache.org/jira/browse/FLINK-34174 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Task >Reporter: RocMarshal >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34175) When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer
[ https://issues.apache.org/jira/browse/FLINK-34175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34175: --- Description: The following test added to SlicingWindowAggOperatorTest can re-produce this problem. {code:java} private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF = new RowType( Arrays.asList( new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", new IntType()), new RowType.RowField("f2", new TimestampType()), new RowType.RowField("f3", new TimestampType()), new RowType.RowField( "f4", new TimestampType(false, TimestampKind.ROWTIME, 3; protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF = new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); @Test public void test() throws Exception { final SliceAssigner innerAssigner = SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3)); final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner); final SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction(assigner); SlicingWindowOperator operator = (SlicingWindowOperator) WindowAggOperatorBuilder.builder() .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF) .shiftTimeZone(shiftTimeZone) .keySerializer(KEY_SER) .assigner(assigner) .aggregate(wrapGenerated(aggsFunction), ACC_SER) .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); testHarness.setup(OUT_SERIALIZER); testHarness.open(); // process elements ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order testHarness.processElement( insertRecord( "key2", 1, fromEpochMillis(999L), fromEpochMillis(3999L), fromEpochMillis(3998L))); testHarness.processWatermark(new Watermark(999)); expectedOutput.add(new Watermark(999)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.close(); }{code} > When meeting WindowedSliceAssigner, slice window agg registers an wrong > timestamp timer > > > Key: FLINK-34175 > URL: https://issues.apache.org/jira/browse/FLINK-34175 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: xuyang >Priority: Major > > The following test added to SlicingWindowAggOperatorTest can re-produce this > problem. > {code:java} > private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF = > new RowType( > Arrays.asList( > new RowType.RowField("f0", new > VarCharType(Integer.MAX_VALUE)), > new RowType.RowField("f1", new IntType()), > new RowType.RowField("f2", new TimestampType()), > new RowType.RowField("f3", new TimestampType()), > new RowType.RowField( > "f4", new TimestampType(false, > TimestampKind.ROWTIME, 3; > protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF = > new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); > @Test > public void test() throws Exception { > final SliceAssigner innerAssigner = > SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3)); > final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner); > final SlicingSumAndCountAggsFunction aggsFunction = > new SlicingSumAndCountAggsFunction(assigner); > SlicingWindowOperator operator = > (SlicingWindowOperator) > WindowAggOperatorBuilder.builder() > .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF) > .shiftTimeZone(shiftTimeZone) > .keySerializer(KEY_SER) > .assigner(assigner) > .aggregate(wrapGenerated(aggsFunction), ACC_SER) > .build(); > OneInputStreamOperatorTestHarness testHarness = > createTestHarness(operator); > testHarness.setup(OUT_SERIALIZER); > testHarness.open(); > // process elements > ConcurrentLinkedQueue expectedOutput
[jira] [Created] (FLINK-34175) When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer
xuyang created FLINK-34175: -- Summary: When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer Key: FLINK-34175 URL: https://issues.apache.org/jira/browse/FLINK-34175 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: xuyang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33450) Implement JdbcAutoScalerStateStore
[ https://issues.apache.org/jira/browse/FLINK-33450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809223#comment-17809223 ] Rui Fan commented on FLINK-33450: - Merged to main(1.8.0) via : * c34fe4f41a2f684da01d84ab0d761ec669af83f6 * 293067ab3fa227eb6a38ef5d1255fa4641da142c * 96b2aa0b6684d0b06602c128a45f8886adf9324a * 6e78221e6abc72ce8daeb92be1cf58fd2506b591 > Implement JdbcAutoScalerStateStore > -- > > Key: FLINK-33450 > URL: https://issues.apache.org/jira/browse/FLINK-33450 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Design doc: > https://docs.google.com/document/d/1lE4s3ZAyCfzYT4dNOVarz5dIctQ8UUAVqjPKlQ1XU1c/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33450) Implement JdbcAutoScalerStateStore
[ https://issues.apache.org/jira/browse/FLINK-33450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-33450. - Resolution: Fixed > Implement JdbcAutoScalerStateStore > -- > > Key: FLINK-33450 > URL: https://issues.apache.org/jira/browse/FLINK-33450 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Design doc: > https://docs.google.com/document/d/1lE4s3ZAyCfzYT4dNOVarz5dIctQ8UUAVqjPKlQ1XU1c/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34174][runtime] Remove SlotMatchingStrategy related logic [flink]
RocMarshal commented on PR #24158: URL: https://github.com/apache/flink/pull/24158#issuecomment-1903021276 Hi, @1996fanrui Thanks for your review. The main reason for initiating this pr is https://issues.apache.org/jira/browse/FLINK-31449 (IIUC) as the current classes are no longer being used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. [flink-connector-hbase]
MOBIN-F closed pull request #36: [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. URL: https://github.com/apache/flink-connector-hbase/pull/36 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
1996fanrui merged PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] E2E for minibatch join [flink]
flinkbot commented on PR #24161: URL: https://github.com/apache/flink/pull/24161#issuecomment-1903008823 ## CI report: * cc0d27ae8047710e53107d9fd9b50b8578c9ff25 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Introduce operator for minibatch join [flink]
flinkbot commented on PR #24160: URL: https://github.com/apache/flink/pull/24160#issuecomment-1903006337 ## CI report: * d487b086c0cccb48244e46b106bcd26143e6a272 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft][bugfix] Move the deserialization of shuffleDescriptor to a separate … [flink]
caodizhou commented on PR #24115: URL: https://github.com/apache/flink/pull/24115#issuecomment-1903002055 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]
luoyuxia commented on code in PR #24152: URL: https://github.com/apache/flink/pull/24152#discussion_r1461300877 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java: ## @@ -129,40 +129,55 @@ public void processElement( // do lookup for acc msg if (RowDataUtil.isAccumulateMsg(in)) { -// clear local state first -deleteState(); +if (lookupJoinRunner.preFilter(in)) { +// clear local state first +deleteState(); -// fetcher has copied the input field when object reuse is enabled -lookupJoinRunner.doFetch(in); +// fetcher has copied the input field when object reuse is enabled +lookupJoinRunner.doFetch(in); -// update state with empty row if lookup miss or pre-filtered -if (!collectListener.collected) { -updateState(emptyRow); +// update state with empty row if lookup miss or pre-filtered +if (!collectListener.collected) { +updateState(emptyRow); +} } - lookupJoinRunner.padNullForLeftJoin(in, out); } else { -// do state access for non-acc msg -if (lookupKeyContainsPrimaryKey) { -RowData rightRow = uniqueState.value(); -// should distinguish null from empty(lookup miss) -if (null == rightRow) { -stateStaledErrorHandle(in, out); -} else { -collectDeleteRow(in, rightRow, out); -} -} else { -List rightRows = state.value(); -if (null == rightRows) { -stateStaledErrorHandle(in, out); +boolean collected = false; +if (lookupJoinRunner.preFilter(in)) { Review Comment: Got it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] introduce BufferBundle for minibatch join [flink]
flinkbot commented on PR #24159: URL: https://github.com/apache/flink/pull/24159#issuecomment-1902987990 ## CI report: * 719a6c42f3b0f7adfb7047f9ac2a29d6a62b3afb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] E2E for minibatch join [flink]
xishuaidelin opened a new pull request, #24161: URL: https://github.com/apache/flink/pull/24161 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Introduce operator for minibatch join [flink]
xishuaidelin opened a new pull request, #24160: URL: https://github.com/apache/flink/pull/24160 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] introduce BufferBundle for minibatch join [flink]
xishuaidelin opened a new pull request, #24159: URL: https://github.com/apache/flink/pull/24159 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]
lincoln-lil commented on code in PR #24152: URL: https://github.com/apache/flink/pull/24152#discussion_r1461293471 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java: ## @@ -129,40 +129,55 @@ public void processElement( // do lookup for acc msg if (RowDataUtil.isAccumulateMsg(in)) { -// clear local state first -deleteState(); +if (lookupJoinRunner.preFilter(in)) { +// clear local state first +deleteState(); -// fetcher has copied the input field when object reuse is enabled -lookupJoinRunner.doFetch(in); +// fetcher has copied the input field when object reuse is enabled +lookupJoinRunner.doFetch(in); -// update state with empty row if lookup miss or pre-filtered -if (!collectListener.collected) { -updateState(emptyRow); +// update state with empty row if lookup miss or pre-filtered +if (!collectListener.collected) { +updateState(emptyRow); +} } - lookupJoinRunner.padNullForLeftJoin(in, out); } else { -// do state access for non-acc msg -if (lookupKeyContainsPrimaryKey) { -RowData rightRow = uniqueState.value(); -// should distinguish null from empty(lookup miss) -if (null == rightRow) { -stateStaledErrorHandle(in, out); -} else { -collectDeleteRow(in, rightRow, out); -} -} else { -List rightRows = state.value(); -if (null == rightRows) { -stateStaledErrorHandle(in, out); +boolean collected = false; +if (lookupJoinRunner.preFilter(in)) { Review Comment: The preFilter here only related to left local filter conditions which can short-cut the lookup path(introduced by FLINK-18445), so the filter on right tale `b.name is not null` will not affect the the preFilter logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-34049) Refactor classes related to window TVF aggregation to prepare for non-aligned windows
[ https://issues.apache.org/jira/browse/FLINK-34049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang closed FLINK-34049. - Resolution: Implemented > Refactor classes related to window TVF aggregation to prepare for non-aligned > windows > - > > Key: FLINK-34049 > URL: https://issues.apache.org/jira/browse/FLINK-34049 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > > Refactor classes related to window TVF aggregation such as > AbstractWindowAggProcessor to prepare for the implementation of non-aligned > windows like session window -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore [flink-kubernetes-operator]
1996fanrui commented on PR #741: URL: https://github.com/apache/flink-kubernetes-operator/pull/741#issuecomment-1902917759 Thanks for the patient review, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34049) Refactor classes related to window TVF aggregation to prepare for non-aligned windows
[ https://issues.apache.org/jira/browse/FLINK-34049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809213#comment-17809213 ] Shengkai Fang commented on FLINK-34049: --- Merged into master: e345ffb453ac482d0250736b687cf88b85c606b0 > Refactor classes related to window TVF aggregation to prepare for non-aligned > windows > - > > Key: FLINK-34049 > URL: https://issues.apache.org/jira/browse/FLINK-34049 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > > Refactor classes related to window TVF aggregation such as > AbstractWindowAggProcessor to prepare for the implementation of non-aligned > windows like session window -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]
fsk119 merged PR #24068: URL: https://github.com/apache/flink/pull/24068 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
libenchao commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1461286671 ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java: ## @@ -70,6 +98,51 @@ public void testFilterPushdown() { "SELECT id, time_col, real_col FROM jdbc WHERE id = 91 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } +/** + * Note the join condition is not present in the optimized plan, as it is handled in the JDBC Review Comment: > just for my understanding ar we thinking that the rowdata sent in on the lookup would contain the pushdown predicates so the code could call and this would handle the predicates and the keys: > statement = lookupKeyRowConverter.toExternal(keyRow, statement); > we could then remove the need for? > statement = setPredicateParams(statement); What I was proposing is showing the predicates in the digest of lookup join node. That way, we can see it in the test xml files, and also it can be shown in Flink Web UI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34029][runtime-web] Support different profiling mode on Flink WEB [flink]
yuchen-ecnu commented on PR #24130: URL: https://github.com/apache/flink/pull/24130#issuecomment-1902905798 Hi @Myasuka , do you have time to help review this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty [flink]
luoyuxia commented on code in PR #24152: URL: https://github.com/apache/flink/pull/24152#discussion_r1461275950 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java: ## @@ -129,40 +129,55 @@ public void processElement( // do lookup for acc msg if (RowDataUtil.isAccumulateMsg(in)) { -// clear local state first -deleteState(); +if (lookupJoinRunner.preFilter(in)) { +// clear local state first +deleteState(); -// fetcher has copied the input field when object reuse is enabled -lookupJoinRunner.doFetch(in); +// fetcher has copied the input field when object reuse is enabled +lookupJoinRunner.doFetch(in); -// update state with empty row if lookup miss or pre-filtered -if (!collectListener.collected) { -updateState(emptyRow); +// update state with empty row if lookup miss or pre-filtered +if (!collectListener.collected) { +updateState(emptyRow); +} } - lookupJoinRunner.padNullForLeftJoin(in, out); } else { -// do state access for non-acc msg -if (lookupKeyContainsPrimaryKey) { -RowData rightRow = uniqueState.value(); -// should distinguish null from empty(lookup miss) -if (null == rightRow) { -stateStaledErrorHandle(in, out); -} else { -collectDeleteRow(in, rightRow, out); -} -} else { -List rightRows = state.value(); -if (null == rightRows) { -stateStaledErrorHandle(in, out); +boolean collected = false; +if (lookupJoinRunner.preFilter(in)) { Review Comment: If the filter condition is on the right table,will `lookupJoinRunner.preFilter(in)` will be true? like ```sql from a join b as FOR SYSTEM_TIME AS OF PROCTIME() on a.id = b.id and b.name is not null ``` It seems the filter ` b.name is not null` will be code gen into `generatedFetcher` instead of `preFilterCondition` in `LookupJoinRunner`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32417) DynamicKafkaSource User Documentation
[ https://issues.apache.org/jira/browse/FLINK-32417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32417: --- Labels: pull-request-available (was: ) > DynamicKafkaSource User Documentation > - > > Key: FLINK-32417 > URL: https://issues.apache.org/jira/browse/FLINK-32417 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: Mason Chen >Assignee: Mason Chen >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.1.0 > > > Add user documentation for DynamicKafkaSource -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32417] Add DynamicKafkaSource documentation for setter methods… [flink-connector-kafka]
mas-chen commented on PR #80: URL: https://github.com/apache/flink-connector-kafka/pull/80#issuecomment-1902871885 FYI: @mxm @tzulitai -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-32417] Add DynamicKafkaSource documentation for setter methods… [flink-connector-kafka]
mas-chen opened a new pull request, #80: URL: https://github.com/apache/flink-connector-kafka/pull/80 …, metrics, and config options I verified locally that the documentation comes up properly and that all the links work. https://github.com/apache/flink-connector-kafka/assets/6834335/13bce63e-0b61-401a-b70e-906aac64e067;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33803] Set observedGeneration at end of reconciliation [flink-kubernetes-operator]
gyfora commented on code in PR #755: URL: https://github.com/apache/flink-kubernetes-operator/pull/755#discussion_r1461096099 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java: ## @@ -126,6 +126,9 @@ private static void updateStatusForSpecReconcil status.setError(null); reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli()); +// Set observedGeneration +status.setObservedGeneration(target.getMetadata().getGeneration()); Review Comment: I still don't think that this is consistent with the current content of the lastReconciledSpec metadata. It's not about setting it to the same value (source of truth) it's also about setting it at the exact same time. Also we should remove internal useges of the lastReconciledSpec.meta.generation and replace it with this one. We have tests for the behaviour so that would validate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34131) Checkpoint check window should take in account checkpoint job configuration
[ https://issues.apache.org/jira/browse/FLINK-34131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-34131: -- Assignee: Nicolas Fraison > Checkpoint check window should take in account checkpoint job configuration > --- > > Key: FLINK-34131 > URL: https://issues.apache.org/jira/browse/FLINK-34131 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Minor > Labels: pull-request-available > > When enabling checkpoint progress check > (kubernetes.operator.cluster.health-check.checkpoint-progress.enabled) to > define cluster health the operator rely detect if a checkpoint has been > performed during the > kubernetes.operator.cluster.health-check.checkpoint-progress.window > As indicated in the doc it must be bigger to checkpointing interval. > But this is a manual configuration which can leads to misconfiguration and > unwanted restart of the flink cluster if the checkpointing interval is bigger > than the window one. > The operator must check that the config is healthy before to rely on this > check. If it is not well set it should not execute the check (return true on > [evaluateCheckpoints|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java#L197C1-L199C50]) > and log a WARN message. > Also flink jobs have other checkpointing parameters that should be taken in > account for this window configuration which are > execution.checkpointing.timeout and > execution.checkpointing.tolerable-failed-checkpoints > The idea would be to check that > kubernetes.operator.cluster.health-check.checkpoint-progress.window >= > max(execution.checkpointing.interval * > execution.checkpointing.tolerable-failed-checkpoints, > execution.checkpointing.timeout * > execution.checkpointing.tolerable-failed-checkpoints) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34131) Checkpoint check window should take in account checkpoint job configuration
[ https://issues.apache.org/jira/browse/FLINK-34131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-34131. -- Resolution: Fixed merged to main 775bc5f41df09accefca6590112509c6023b2fb4 > Checkpoint check window should take in account checkpoint job configuration > --- > > Key: FLINK-34131 > URL: https://issues.apache.org/jira/browse/FLINK-34131 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Nicolas Fraison >Priority: Minor > Labels: pull-request-available > > When enabling checkpoint progress check > (kubernetes.operator.cluster.health-check.checkpoint-progress.enabled) to > define cluster health the operator rely detect if a checkpoint has been > performed during the > kubernetes.operator.cluster.health-check.checkpoint-progress.window > As indicated in the doc it must be bigger to checkpointing interval. > But this is a manual configuration which can leads to misconfiguration and > unwanted restart of the flink cluster if the checkpointing interval is bigger > than the window one. > The operator must check that the config is healthy before to rely on this > check. If it is not well set it should not execute the check (return true on > [evaluateCheckpoints|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java#L197C1-L199C50]) > and log a WARN message. > Also flink jobs have other checkpointing parameters that should be taken in > account for this window configuration which are > execution.checkpointing.timeout and > execution.checkpointing.tolerable-failed-checkpoints > The idea would be to check that > kubernetes.operator.cluster.health-check.checkpoint-progress.window >= > max(execution.checkpointing.interval * > execution.checkpointing.tolerable-failed-checkpoints, > execution.checkpointing.timeout * > execution.checkpointing.tolerable-failed-checkpoints) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34131) Checkpoint check window should take in account checkpoint job configuration
[ https://issues.apache.org/jira/browse/FLINK-34131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-34131: --- Fix Version/s: kubernetes-operator-1.8.0 > Checkpoint check window should take in account checkpoint job configuration > --- > > Key: FLINK-34131 > URL: https://issues.apache.org/jira/browse/FLINK-34131 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > When enabling checkpoint progress check > (kubernetes.operator.cluster.health-check.checkpoint-progress.enabled) to > define cluster health the operator rely detect if a checkpoint has been > performed during the > kubernetes.operator.cluster.health-check.checkpoint-progress.window > As indicated in the doc it must be bigger to checkpointing interval. > But this is a manual configuration which can leads to misconfiguration and > unwanted restart of the flink cluster if the checkpointing interval is bigger > than the window one. > The operator must check that the config is healthy before to rely on this > check. If it is not well set it should not execute the check (return true on > [evaluateCheckpoints|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java#L197C1-L199C50]) > and log a WARN message. > Also flink jobs have other checkpointing parameters that should be taken in > account for this window configuration which are > execution.checkpointing.timeout and > execution.checkpointing.tolerable-failed-checkpoints > The idea would be to check that > kubernetes.operator.cluster.health-check.checkpoint-progress.window >= > max(execution.checkpointing.interval * > execution.checkpointing.tolerable-failed-checkpoints, > execution.checkpointing.timeout * > execution.checkpointing.tolerable-failed-checkpoints) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34131] Ensure cluster.health-check.checkpoint-progress.window is well configure [flink-kubernetes-operator]
gyfora merged PR #758: URL: https://github.com/apache/flink-kubernetes-operator/pull/758 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager
[ https://issues.apache.org/jira/browse/FLINK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809152#comment-17809152 ] Anurag Kyal commented on FLINK-32212: - I am running into this issue as well after my job restarts (trying to figure out the reason for random restarts) but these errors show up. Most of the times it stops after 8-10 exceptions but occasionally it gets into an infinite loop. I am using a pubsub connector to read notifications as the first step in my job and seems like these exceptions are related to this step - at least that is what the UI says. Was anyone able to find other solutions rather than manual intervention which I want to avoid in the middle of night. > Job restarting indefinitely after an IllegalStateException from > BlobLibraryCacheManager > --- > > Key: FLINK-32212 > URL: https://issues.apache.org/jira/browse/FLINK-32212 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.16.1 > Environment: Apache Flink Kubernetes Operator 1.4 >Reporter: Matheus Felisberto >Priority: Major > > After running for a few hours the job starts to throw IllegalStateException > and I can't figure out why. To restore the job, I need to manually delete the > FlinkDeployment to be recreated and redeploy everything. > The jar is built-in into the docker image, hence is defined accordingly with > the Operator's documentation: > {code:java} > // jarURI: local:///opt/flink/usrlib/my-job.jar {code} > I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work > either. > > {code:java} > // Source: my-topic (1/2)#30587 > (b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) > switched from DEPLOYING to FAILED with failure cause: > java.lang.IllegalStateException: The library registration references a > different set of library BLOBs than previous registrations for this job: > old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396] > new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2] > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336) > at > org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.base/java.lang.Thread.run(Unknown Source) {code} > If there is any other information that can help to identify the problem, > please let me know. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
leonardBang commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1460846335 ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ## @@ -121,6 +128,12 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTimestamp; Review Comment: IIUC, we should convert Flink SQL TIMESTAMP_NTZ type to AVRO Local Timestamp finally, this is the correct behavior ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java: ## @@ -71,5 +70,12 @@ public InlineElement getDescription() { } } +@Deprecated +public static final ConfigOption AVRO_TIMESTAMP_LEGACY_MAPPING = Review Comment: Introducing a **new** config option with `Deprecated` is a little confused to users, I think we can mark it as `Deprecated` when we decide to to deprecate it. ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java: ## @@ -86,6 +86,27 @@ public AvroRowDataDeserializationSchema( typeInfo); } +/** + * Creates an Avro deserialization schema for the given logical type. + * + * @param rowType The logical type used to deserialize the data. + * @param typeInfo The TypeInformation to be used by {@link + * AvroRowDataDeserializationSchema#getProducedType()}. + * @param encoding The serialization approach used to deserialize the data. + * @param legacyMapping Whether to use legacy mapping. Review Comment: the scope meaning of `legacyMapping` is too large for our case, what we want to express is only timestamp types' legacy behavior, but this name looks like we have a legacy behavior for all types. How about `legacyTimestampMapping`? ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java: ## @@ -352,17 +457,55 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { final TimestampType timestampType = (TimestampType) logicalType; precision = timestampType.getPrecision(); org.apache.avro.LogicalType avroLogicalType; -if (precision <= 3) { -avroLogicalType = LogicalTypes.timestampMillis(); +if (legacyMapping) { +if (precision <= 3) { +avroLogicalType = LogicalTypes.timestampMillis(); +} else if (precision <= 6) { +avroLogicalType = LogicalTypes.timestampMicros(); +} else { Review Comment: I'm happy that you can add precision support here, but I suggest we can open a new PR or commit to finish this work, it will be more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org