[jira] [Assigned] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-33934: Assignee: Yuan Kui > Flink SQL Source use raw format maybe lead to data lost > --- > > Key: FLINK-33934 > URL: https://issues.apache.org/jira/browse/FLINK-33934 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Runtime >Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, > 1.19.0 >Reporter: Cai Liuyang >Assignee: Yuan Kui >Priority: Major > > In our product we encounter a case that lead to data lost, the job info: > 1. using flinkSQL that read data from messageQueue (our internal mq) and > write to hive (only select value field, doesn't contain metadata field) > 2. the format of source table is raw format > > But if we select value field and metadata field at the same time, than the > data lost will not appear > > After we review the code, we found that the reason is the object reuse of > Raw-format(see code > [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), > why object reuse will lead to this problem is below (take kafka as example): > 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of > SourceOperator, Fetcher-Thread will read and deserialize data from kafka > partition, than put data to ElementQueue (see code [SourceOperator > FetcherTask > |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) > 2. SourceOperator's main thread will pull data from the > ElementQueue(which is shared with the FetcherThread) and process it (see code > [SourceOperator main > thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) > 3. For RawFormatDeserializationSchema, its deserialize function will > return the same object([reuse rowData > object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) > 4. So, if elementQueue have element that not be consumed, than the > fetcherThread can change the filed of the reused rawData that > RawFormatDeserializationSchema::deserialize returned, this will lead to data > lost; > > The reason that we select value and metadata field at the same time will not > encounter data lost is: > if we select metadata field there will return a new RowData object see > code: [DynamicKafkaDeserializationSchema deserialize with metadata field > |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] > and if we only select value filed, it will reuse the RowData object that > formatDeserializationSchema returned see code > [DynamicKafkaDeserializationSchema deserialize only with value > field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] > > To solve this problem, i think we should remove reuse object of > RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555253454 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest { + +/** The type of processing request. */ +public enum RequestType { +/** Process one record without state access. */ +SYNC, +/** Get from one {@link State}. */ +GET, +/** Put to one {@link State}. */ +PUT, +/** Merge value to an exist key in {@link State}. Mainly used for listState. */ +MERGE, +/** Delete from one {@link State}. */ +DELETE +} + +/** The underlying state to be accessed, can be empty for {@link RequestType#SYNC}. */ +@Nullable private final State state; + +/** The type of this request. */ +private final RequestType type; + +/** The payload(input) of this request. */ +@Nullable private final IN payload; + +/** The future to collect the result of the request. */ +private InternalStateFuture stateFuture; + +/** The record context of this request. */ +private RecordContext context; + +StateRequest(@Nullable State state, RequestType type, @Nullable IN payload) { +this.state = state; +this.type = type; +this.payload = payload; +} + +RequestType getRequestType() { +return type; +} + +@Nullable +IN getPayload() { +return payload; +} + +@Nullable +State getState() { +return state; +} + +InternalStateFuture getFuture() { +return stateFuture; +} + +void setFuture(InternalStateFuture future) { +stateFuture = future; +} + +RecordContext getRecordContext() { +return context; +} + +void setRecordContext(RecordContext context) { +this.context = context; +} Review Comment: @masteryhx After trying, I think we are still not able to avoid lazy setting some contexts or create some temporary instances. How about we make `AEC#handleRequest` receive the information from outside and build the `StateRequest` internally? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35042) Streaming File Sink s3 end-to-end test failed as TM lost
[ https://issues.apache.org/jira/browse/FLINK-35042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35042: --- Description: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=14344 FAIL 'Streaming File Sink s3 end-to-end test' failed after 15 minutes and 20 seconds! Test exited with exit code 1 I have checked the JM log, it seems that a taskmanager is no longer reachable: {code:java} 2024-04-08T01:12:04.3922210Z Apr 08 01:12:04 2024-04-08 00:58:15,517 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Unnamed (4/4) (14b44f534745ffb2f1ef03fca34f7f0d_0a448493b4782967b150582570326227_3_0) switched from RUNNING to FAILED on localhost:44987-47f5af @ localhost (dataPort=34489). 2024-04-08T01:12:04.3924522Z Apr 08 01:12:04 org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id localhost:44987-47f5af is no longer reachable. 2024-04-08T01:12:04.3925421Z Apr 08 01:12:04at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1511) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3926185Z Apr 08 01:12:04at org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3926925Z Apr 08 01:12:04at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3929898Z Apr 08 01:12:04at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3930692Z Apr 08 01:12:04at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3931442Z Apr 08 01:12:04at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3931917Z Apr 08 01:12:04at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_402] 2024-04-08T01:12:04.3934759Z Apr 08 01:12:04at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_402] 2024-04-08T01:12:04.3935252Z Apr 08 01:12:04at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_402] 2024-04-08T01:12:04.3935989Z Apr 08 01:12:04at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460) ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3936731Z Apr 08 01:12:04at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3938103Z Apr 08 01:12:04at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460) ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3942549Z Apr 08 01:12:04at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225) ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3945371Z Apr 08 01:12:04at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3946244Z Apr 08 01:12:04at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3946960Z Apr 08 01:12:04at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3947664Z Apr 08 01:12:04at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3950764Z Apr 08 01:12:04at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3952816Z Apr 08 01:12:04at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT] 2024-04-08T01:12:04.3953526Z Apr 08 01:12:04at
[jira] [Created] (FLINK-35042) Streaming File Sink s3 end-to-end test failed as TM lost
Weijie Guo created FLINK-35042: -- Summary: Streaming File Sink s3 end-to-end test failed as TM lost Key: FLINK-35042 URL: https://issues.apache.org/jira/browse/FLINK-35042 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555247481 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest { + +/** The type of processing request. */ +public enum RequestType { +/** Process one record without state access. */ +SYNC, +/** Get from one {@link State}. */ +GET, +/** Put to one {@link State}. */ +PUT, +/** Merge value to an exist key in {@link State}. Mainly used for listState. */ +MERGE, +/** Delete from one {@link State}. */ +DELETE +} + +/** The underlying state to be accessed, can be empty for {@link RequestType#SYNC}. */ +@Nullable private final State state; + +/** The type of this request. */ +private final RequestType type; + +/** The payload(input) of this request. */ +@Nullable private final IN payload; + +/** The future to collect the result of the request. */ +private InternalStateFuture stateFuture; + +/** The record context of this request. */ +private RecordContext context; + +StateRequest(@Nullable State state, RequestType type, @Nullable IN payload) { +this.state = state; +this.type = type; +this.payload = payload; +} + +RequestType getRequestType() { +return type; +} + +@Nullable +IN getPayload() { +return payload; +} + +@Nullable +State getState() { +return state; +} + +InternalStateFuture getFuture() { +return stateFuture; +} + +void setFuture(InternalStateFuture future) { +stateFuture = future; +} + +RecordContext getRecordContext() { +return context; +} + +void setRecordContext(RecordContext context) { +this.context = context; +} Review Comment: Well this is a good suggestion. I'll try this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555245552 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.state.InternalStateFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Async Execution Controller (AEC) receives processing requests from operators, and put them + * into execution according to some strategies. + * + * It is responsible for: + * Preserving the sequence of elements bearing the same key by delaying subsequent requests + * until the processing of preceding ones is finalized. + * Tracking the in-flight data(records) and blocking the input if too much data in flight + * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, + * allowing for the execution of callbacks (mails in Mailbox). + * + * @param the type of the record + * @param the type of the key + */ +public class AsyncExecutionController { + +private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); + +public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; + +/** The max allow number of in-flight records. */ +private final int maxInFlightRecordNum; + +/** The key accounting unit which is used to detect the key conflict. */ +final KeyAccountingUnit keyAccountingUnit; + +/** + * A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto + * wire the created future with mailbox executor. Also conducting the context switch. + */ +private final StateFutureFactory stateFutureFactory; + +/** The state executor where the {@link StateRequest} is actually executed. */ +final StateExecutor stateExecutor; + +/** The corresponding context that currently runs in task thread. */ +RecordContext currentContext; + +public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { +this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +} + +public AsyncExecutionController( +MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { +this.keyAccountingUnit = new KeyAccountingUnit<>(); +this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); +this.stateExecutor = stateExecutor; +this.maxInFlightRecordNum = maxInFlightRecords; +LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); +} + +/** + * Build a new context based on record and key. Also wired with internal {@link + * KeyAccountingUnit}. + * + * @param record the given record. + * @param key the given key. + * @return the built record context. + */ +public RecordContext buildContext(R record, K key) { Review Comment: Well, building/initializing is a special use case other than simple setting. We can change the behavior of this method to something like `buildAndSet`. But I slightly tend to make interfaces implement more specialized(single) functions rather than multiple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555244839 ## flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java: ## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.StateRequest.RequestType; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link AsyncExecutionController}. */ +class AsyncExecutionControllerTest { + +// TODO: this test is not well completed, cause buffering in AEC is not implemented. +// Yet, just for illustrating the interaction between AEC and Async state API. +@Test +void testBasicRun() { +TestAsyncExecutionController aec = +new TestAsyncExecutionController<>( +new SyncMailboxExecutor(), new TestStateExecutor()); +TestUnderlyingState underlyingState = new TestUnderlyingState(); +TestValueState valueState = new TestValueState(aec, underlyingState); +AtomicInteger output = new AtomicInteger(); +Runnable userCode = +() -> { +valueState +.asyncValue() +.thenCompose( +val -> { +int updated = (val == null ? 1 : (val + 1)); +return valueState +.asyncUpdate(updated) +.thenCompose( +o -> + StateFutureUtils.completedFuture( + updated)); +}) +.thenAccept(val -> output.set(val)); +}; + +// element1 +String record1 = "key1-r1"; +String key1 = "key1"; +// Simulate the wrapping in {@link RecordProcessorUtils#getRecordProcessor()}, wrapping the +// record and key with RecordContext. +RecordContext recordContext1 = aec.buildContext(record1, key1); +aec.setCurrentContext(recordContext1); +// execute user code +userCode.run(); + +// Single-step run. +// Firstly, the user code generates value get in active buffer. +assertThat(aec.activeBuffer.size()).isEqualTo(1); +assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); +aec.triggerIfNeeded(true); +// After running, the value update is in active buffer. +assertThat(aec.activeBuffer.size()).isEqualTo(1); +assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); +aec.triggerIfNeeded(true); +// Value update finishes. +assertThat(aec.activeBuffer.size()).isEqualTo(0); +assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); +assertThat(output.get()).isEqualTo(1); +assertThat(recordContext1.getReferenceCount()).isEqualTo(0); + +// element 2 & 3 +String record2 = "key1-r2"; +String key2 = "key1"; +RecordContext recordContext2 = aec.buildContext(record2, key2); +aec.setCurrentContext(recordContext2); +// execute user code +
[jira] [Updated] (FLINK-34273) git fetch fails
[ https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-34273: --- Affects Version/s: 1.20.0 > git fetch fails > --- > > Key: FLINK-34273 > URL: https://issues.apache.org/jira/browse/FLINK-34273 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We've seen multiple {{git fetch}} failures. I assume this to be an > infrastructure issue. This Jira issue is for documentation purposes. > {code:java} > error: RPC failed; curl 18 transfer closed with outstanding read data > remaining > error: 5211 bytes of body are still expected > fetch-pack: unexpected disconnect while reading sideband packet > fatal: early EOF > fatal: fetch-pack: invalid index-pack output {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35023) YARNApplicationITCase failed on Azure
[ https://issues.apache.org/jira/browse/FLINK-35023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834783#comment-17834783 ] Weijie Guo commented on FLINK-35023: jdk17 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28689 > YARNApplicationITCase failed on Azure > - > > Key: FLINK-35023 > URL: https://issues.apache.org/jira/browse/FLINK-35023 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > 1. > YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion > {code:java} > Apr 06 02:19:44 02:19:44.063 [ERROR] > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion > -- Time elapsed: 9.727 s <<< FAILURE! > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion$1(YARNApplicationITCase.java:72) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion(YARNApplicationITCase.java:70) > Apr 06 02:19:44 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 06 02:19:44 at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > 2. YARNApplicationITCase.testApplicationClusterWithRemoteUserJar > {code:java} > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithRemoteUserJar$2(YARNApplicationITCase.java:86) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithRemoteUserJar(YARNApplicationITCase.java:84) > Apr 06 02:19:44 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 06 02:19:44 at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > 3. > YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion > {code:java} > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion$0(YARNApplicationITCase.java:62) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44
[jira] [Commented] (FLINK-34273) git fetch fails
[ https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834784#comment-17834784 ] Weijie Guo commented on FLINK-34273: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=59a2b95a-736b-5c46-b3e0-cee6e587fd86=c5b19363-f3ba-59e0-bfbf-73c4b9bde45c=264 > git fetch fails > --- > > Key: FLINK-34273 > URL: https://issues.apache.org/jira/browse/FLINK-34273 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.19.0, 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We've seen multiple {{git fetch}} failures. I assume this to be an > infrastructure issue. This Jira issue is for documentation purposes. > {code:java} > error: RPC failed; curl 18 transfer closed with outstanding read data > remaining > error: 5211 bytes of body are still expected > fetch-pack: unexpected disconnect while reading sideband packet > fatal: early EOF > fatal: fetch-pack: invalid index-pack output {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35041: --- Description: {code:java} Apr 08 03:22:45 03:22:45.450 [ERROR] org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration -- Time elapsed: 0.034 s <<< FAILURE! Apr 08 03:22:45 org.opentest4j.AssertionFailedError: Apr 08 03:22:45 Apr 08 03:22:45 expected: false Apr 08 03:22:45 but was: true Apr 08 03:22:45 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Apr 08 03:22:45 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) Apr 08 03:22:45 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) Apr 08 03:22:45 at org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) Apr 08 03:22:45 at org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) Apr 08 03:22:45 at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) Apr 08 03:22:45 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) Apr 08 03:22:45 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) Apr 08 03:22:45 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) Apr 08 03:22:45 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238 > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
Weijie Guo created FLINK-35041: -- Summary: IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed Key: FLINK-35041 URL: https://issues.apache.org/jira/browse/FLINK-35041 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555241719 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Key accounting unit holds the current in-flight key and tracks the corresponding ongoing records, + * which is used to preserve the ordering of independent chained {@link + * org.apache.flink.api.common.state.v2.StateFuture}. + * + * @param the type of record + * @param the type of key + */ +public class KeyAccountingUnit { Review Comment: I'm not a big fan of interface abstraction. Two questions: Is there a need for two or more behaviors? And how could we/user choose one of them? It is just a class for internal usage so I'd keep this normal class until we meet a concrete use case for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555239594 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.Internal; + +import java.util.concurrent.CompletableFuture; + +/** Executor for executing batch {@link StateRequest}s. */ +@Internal +public interface StateExecutor { +/** + * Execute a batch of state requests. + * + * @param processingRequests the given batch of processing requests + * @return A future can determine whether execution has completed. + */ +CompletableFuture executeBatchRequests( +Iterable> processingRequests); Review Comment: The sorting and batching in group is finished by the `AEC`, there is no mixed types of requests here. Here we just follow the description of the FLIPs, if we need to change the signature here, it would be fine change this later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555237399 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest { + +/** The type of processing request. */ +public enum RequestType { +/** Process one record without state access. */ +SYNC, Review Comment: It actually means sync with the framework to check if there is another record on-going with a same key. It is something like `SYNC_POINT`. I'll change the doc here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0
[ https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834779#comment-17834779 ] Jiabao Sun commented on FLINK-34955: I have rechecked the dependency of `commons-codec` in `commons-compress` and it is no longer optional. Even if upgraded to 1.26.1, `commons-codec` will still be a transitive dependency. Sorry for the disturbance. > Upgrade commons-compress to 1.26.0 > -- > > Key: FLINK-34955 > URL: https://issues.apache.org/jira/browse/FLINK-34955 > Project: Flink > Issue Type: Improvement >Reporter: Shilun Fan >Assignee: Shilun Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can > refer to the maven link > https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35010) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector
[ https://issues.apache.org/jira/browse/FLINK-35010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834778#comment-17834778 ] Jiabao Sun commented on FLINK-35010: I have rechecked the dependency of `commons-codec` in `commons-compress` and it is no longer optional. Even if upgraded to 1.26.1, `commons-codec` will still be a transitive dependency. Please ignore the previous noise, sorry for the disturbance. > Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink > Mongodb connector > -- > > Key: FLINK-35010 > URL: https://issues.apache.org/jira/browse/FLINK-35010 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / MongoDB >Reporter: Zhongqiang Gong >Assignee: Zhongqiang Gong >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834777#comment-17834777 ] Jiabao Sun commented on FLINK-35008: I have rechecked the dependency of `commons-codec` in `commons-compress` and it is no longer optional. Even if upgraded to 1.26.1, `commons-codec` will still be a transitive dependency. Please ignore the previous noise, sorry for the disturbance. > Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink > Kafka connector > > > Key: FLINK-35008 > URL: https://issues.apache.org/jira/browse/FLINK-35008 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]
Jiabao-Sun commented on code in PR #24580: URL: https://github.com/apache/flink/pull/24580#discussion_r1555174874 ## flink-end-to-end-tests/flink-sql-client-test/pom.xml: ## @@ -69,6 +69,13 @@ under the License. kafka test + + + commons-codec + commons-codec + test + + Review Comment: Hi @mbalassi, @slfan1989, ~I think we needn't this dependency, the `commons-codec`'s dependency is because `commons-compress` incorrectly depended on `commons-codec`'s Charsets in version 1.26.0.~ ~This issue has been fixed in version 1.26.1, so perhaps we should bump `commons-compress` version to 1.26.1.~ ~see: https://issues.apache.org/jira/browse/COMPRESS-659~ ## flink-end-to-end-tests/flink-sql-client-test/pom.xml: ## @@ -69,6 +69,13 @@ under the License. kafka test + + + commons-codec + commons-codec + test + + Review Comment: I have rechecked the dependency of `commons-codec` in `commons-compress` and it is no longer optional. Even if upgraded to 1.26.1, `commons-codec` will still be a transitive dependency. Please ignore the previous noise, sorry for the disturbance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
fredia commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555208732 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest { Review Comment: I think this question is somewhat the same as https://github.com/apache/flink/pull/24614#discussion_r1554806613. In the future, an `internal` `key-value` layer will be introduced, we could make it internal kv interface or class parameter in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
masteryhx commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555175989 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest { + +/** The type of processing request. */ +public enum RequestType { +/** Process one record without state access. */ +SYNC, Review Comment: The naming is a bit confusing. I may thinked it as accessing state synchronously if I missed above comment. How about `NOP` or other names at least says `without state access`. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.Internal; + +import java.util.concurrent.CompletableFuture; + +/** Executor for executing batch {@link StateRequest}s. */ +@Internal +public interface StateExecutor { +/** + * Execute a batch of state requests. + * + * @param processingRequests the given batch of processing requests + * @return A future can determine whether execution has completed. + */ +CompletableFuture executeBatchRequests( +Iterable> processingRequests); Review Comment: Different state types could be executed together, right ? How could we get specific state operations ? Current `RequestType` seems not enough. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.state.InternalStateFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Async Execution Controller (AEC) receives processing requests from operators, and put them + * into execution according to some strategies. + * + * It is responsible for: + * Preserving the sequence of elements bearing the
[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-33934: - Affects Version/s: 1.19.0 1.18.0 1.17.0 1.16.0 1.15.0 1.14.0 1.13.0 1.12.0 > Flink SQL Source use raw format maybe lead to data lost > --- > > Key: FLINK-33934 > URL: https://issues.apache.org/jira/browse/FLINK-33934 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Runtime >Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, > 1.19.0 >Reporter: Cai Liuyang >Priority: Major > > In our product we encounter a case that lead to data lost, the job info: > 1. using flinkSQL that read data from messageQueue (our internal mq) and > write to hive (only select value field, doesn't contain metadata field) > 2. the format of source table is raw format > > But if we select value field and metadata field at the same time, than the > data lost will not appear > > After we review the code, we found that the reason is the object reuse of > Raw-format(see code > [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), > why object reuse will lead to this problem is below (take kafka as example): > 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of > SourceOperator, Fetcher-Thread will read and deserialize data from kafka > partition, than put data to ElementQueue (see code [SourceOperator > FetcherTask > |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) > 2. SourceOperator's main thread will pull data from the > ElementQueue(which is shared with the FetcherThread) and process it (see code > [SourceOperator main > thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) > 3. For RawFormatDeserializationSchema, its deserialize function will > return the same object([reuse rowData > object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) > 4. So, if elementQueue have element that not be consumed, than the > fetcherThread can change the filed of the reused rawData that > RawFormatDeserializationSchema::deserialize returned, this will lead to data > lost; > > The reason that we select value and metadata field at the same time will not > encounter data lost is: > if we select metadata field there will return a new RowData object see > code: [DynamicKafkaDeserializationSchema deserialize with metadata field > |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] > and if we only select value filed, it will reuse the RowData object that > formatDeserializationSchema returned see code > [DynamicKafkaDeserializationSchema deserialize only with value > field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] > > To solve this problem, i think we should remove reuse object of > RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost
[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834765#comment-17834765 ] Yun Tang commented on FLINK-33934: -- I think it's easy to let data lost when using {{raw}} format with another customized connector. Moreover, the object reuse semantic is hidden in the {{raw}} format description. From my point of view, this is a potential bug, cc [~jark]. > Flink SQL Source use raw format maybe lead to data lost > --- > > Key: FLINK-33934 > URL: https://issues.apache.org/jira/browse/FLINK-33934 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Runtime >Reporter: Cai Liuyang >Priority: Major > > In our product we encounter a case that lead to data lost, the job info: > 1. using flinkSQL that read data from messageQueue (our internal mq) and > write to hive (only select value field, doesn't contain metadata field) > 2. the format of source table is raw format > > But if we select value field and metadata field at the same time, than the > data lost will not appear > > After we review the code, we found that the reason is the object reuse of > Raw-format(see code > [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), > why object reuse will lead to this problem is below (take kafka as example): > 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of > SourceOperator, Fetcher-Thread will read and deserialize data from kafka > partition, than put data to ElementQueue (see code [SourceOperator > FetcherTask > |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) > 2. SourceOperator's main thread will pull data from the > ElementQueue(which is shared with the FetcherThread) and process it (see code > [SourceOperator main > thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) > 3. For RawFormatDeserializationSchema, its deserialize function will > return the same object([reuse rowData > object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) > 4. So, if elementQueue have element that not be consumed, than the > fetcherThread can change the filed of the reused rawData that > RawFormatDeserializationSchema::deserialize returned, this will lead to data > lost; > > The reason that we select value and metadata field at the same time will not > encounter data lost is: > if we select metadata field there will return a new RowData object see > code: [DynamicKafkaDeserializationSchema deserialize with metadata field > |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] > and if we only select value filed, it will reuse the RowData object that > formatDeserializationSchema returned see code > [DynamicKafkaDeserializationSchema deserialize only with value > field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] > > To solve this problem, i think we should remove reuse object of > RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
[ https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-35040: Description: The performance of serializerHeavyString regresses since April 3, and had not yet recovered on April 8th. It seem Java 11 regresses, and Java 8 and Java 17 are fine. http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200 !screenshot-1.png! was: The performance of serializerHeavyString regresses since April 3, and had not yet recovered on April 8th. http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200 !image-2024-04-08-10-51-07-403.png! > The performance of serializerHeavyString regresses since April 3 > > > Key: FLINK-35040 > URL: https://issues.apache.org/jira/browse/FLINK-35040 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Priority: Blocker > Attachments: image-2024-04-08-10-51-07-403.png, screenshot-1.png > > > The performance of serializerHeavyString regresses since April 3, and had not > yet recovered on April 8th. > It seem Java 11 regresses, and Java 8 and Java 17 are fine. > http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200 > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
[ https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-35040: Attachment: screenshot-1.png > The performance of serializerHeavyString regresses since April 3 > > > Key: FLINK-35040 > URL: https://issues.apache.org/jira/browse/FLINK-35040 > Project: Flink > Issue Type: Bug > Components: Benchmarks >Affects Versions: 1.20.0 >Reporter: Rui Fan >Priority: Blocker > Attachments: image-2024-04-08-10-51-07-403.png, screenshot-1.png > > > The performance of serializerHeavyString regresses since April 3, and had not > yet recovered on April 8th. > http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200 > !image-2024-04-08-10-51-07-403.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]
morazow commented on PR #24426: URL: https://github.com/apache/flink/pull/24426#issuecomment-2041795538 Thanks @XComp, I am going to address your suggestions. Please have a look once you are back -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]
morazow commented on code in PR #24426: URL: https://github.com/apache/flink/pull/24426#discussion_r1555171619 ## .github/workflows/template.python-wheels-ci.yml: ## @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Python Wheels CI + +on: + workflow_call: + +permissions: read-all + +jobs: + linux: +runs-on: ubuntu-latest +steps: + - name: Checkout the repository +uses: actions/checkout@v4 +with: + fetch-depth: 0 + persist-credentials: false + - name: Build wheels +run: | + cd flink-python + bash dev/build-wheels.sh + - name: Tar artifact +run: tar -cvf python-wheel.tar flink-python/dist Review Comment: Yes, good idea! ## .github/workflows/template.python-wheels-ci.yml: ## @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Python Wheels CI + +on: + workflow_call: + +permissions: read-all + +jobs: + linux: +runs-on: ubuntu-latest +steps: + - name: Checkout the repository +uses: actions/checkout@v4 +with: + fetch-depth: 0 + persist-credentials: false + - name: Build wheels +run: | + cd flink-python + bash dev/build-wheels.sh + - name: Tar artifact +run: tar -cvf python-wheel.tar flink-python/dist + - name: Upload artifact +uses: actions/upload-artifact@v4 +with: + name: wheel_${{ runner.os }}_${{ github.sha }}_${{ github.run_number }} + path: python-wheel.tar + macos: +runs-on: macos-latest +steps: + - name: Checkout the repository +uses: actions/checkout@v4 +with: + fetch-depth: 0 + persist-credentials: false + - name: Install python Review Comment: Yes good idea, I took the same steps from Azure pipelines. But I agree having same Python version on both Linux/MacOs jobs is good idea. I will change workflow. I didn't check which version is installed in the Linux VM ## .github/workflows/nightly.yml: ## @@ -94,3 +94,6 @@ jobs: s3_bucket: ${{ secrets.IT_CASE_S3_BUCKET }} s3_access_key: ${{ secrets.IT_CASE_S3_ACCESS_KEY }} s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }} + python-wheels: Review Comment: Yes, that should be fine, I would also prefer the inlined version. I wanted to keep the structure similar to other jobs, but python wheels are not used more than once. I will change it to the inlined version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]
Jiabao-Sun commented on code in PR #24580: URL: https://github.com/apache/flink/pull/24580#discussion_r1555174874 ## flink-end-to-end-tests/flink-sql-client-test/pom.xml: ## @@ -69,6 +69,13 @@ under the License. kafka test + + + commons-codec + commons-codec + test + + Review Comment: Hi @mbalassi, @slfan1989, I think we needn't this dependency, the `commons-codec`'s dependency is because `commons-compress` incorrectly depended on `commons-codec`'s Charsets in version 1.26.0. This issue has been fixed in version 1.26.1, so perhaps we should bump `commons-compress` version to 1.26.1. see: https://issues.apache.org/jira/browse/COMPRESS-659 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
Rui Fan created FLINK-35040: --- Summary: The performance of serializerHeavyString regresses since April 3 Key: FLINK-35040 URL: https://issues.apache.org/jira/browse/FLINK-35040 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.20.0 Reporter: Rui Fan Attachments: image-2024-04-08-10-51-07-403.png The performance of serializerHeavyString regresses since April 3, and had not yet recovered on April 8th. http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200 !image-2024-04-08-10-51-07-403.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]
morazow commented on code in PR #24426: URL: https://github.com/apache/flink/pull/24426#discussion_r1555166750 ## .github/workflows/template.python-wheels-ci.yml: ## @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Python Wheels CI + +on: + workflow_call: + +permissions: read-all + +jobs: + linux: +runs-on: ubuntu-latest +steps: + - name: Checkout the repository +uses: actions/checkout@v4 +with: + fetch-depth: 0 + persist-credentials: false + - name: Build wheels +run: | + cd flink-python + bash dev/build-wheels.sh + - name: Tar artifact +run: tar -cvf python-wheel.tar flink-python/dist + - name: Upload artifact +uses: actions/upload-artifact@v4 +with: + name: wheel_${{ runner.os }}_${{ github.sha }}_${{ github.run_number }} Review Comment: Hey @XComp, I meant to make the python-wheels-ci GitHub workflow and Azure pipelines. Here the main concern is to keep the Python Wheels artifact names similar. Since the Azure pipelines used `Agent.JobName` it would be fine to use the stringrified workflow name, with `wheel` prefixed. Would that be good option? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]
loserwang1024 commented on code in PR #3134: URL: https://github.com/apache/flink-cdc/pull/3134#discussion_r1555165225 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java: ## @@ -298,22 +298,35 @@ private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEven finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize()); } final int requestMetaGroupId = requestEvent.getRequestMetaGroupId(); - -if (binlogSplitMeta.size() > requestMetaGroupId) { +final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize(); +final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size(); +if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) { Review Comment: done it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]
loserwang1024 commented on code in PR #3134: URL: https://github.com/apache/flink-cdc/pull/3134#discussion_r1555164983 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java: ## @@ -43,10 +43,14 @@ public class BinlogSplitMetaEvent implements SourceEvent { */ private final List metaGroup; -public BinlogSplitMetaEvent(String splitId, int metaGroupId, List metaGroup) { +private final int totalFinishedSplitSize; + +public BinlogSplitMetaEvent( +String splitId, int metaGroupId, List metaGroup, int totalFinishedSplitSize) { Review Comment: Done it. ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java: ## @@ -43,10 +43,14 @@ public class StreamSplitMetaEvent implements SourceEvent { */ private final List metaGroup; -public StreamSplitMetaEvent(String splitId, int metaGroupId, List metaGroup) { +private final int totalFinishedSplitSize; + +public StreamSplitMetaEvent( +String splitId, int metaGroupId, List metaGroup, int totalFinishedSplitSize) { Review Comment: Done it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35039) Create Profiling JobManager/TaskManager Instance failed
[ https://issues.apache.org/jira/browse/FLINK-35039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834758#comment-17834758 ] ude commented on FLINK-35039: - [~Yu Chen] Please have a look, If my modification plan is fine, please give me Assign. > Create Profiling JobManager/TaskManager Instance failed > --- > > Key: FLINK-35039 > URL: https://issues.apache.org/jira/browse/FLINK-35039 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Affects Versions: 1.19.0 > Environment: Hadoop 3.2.2 > Flink 1.19 >Reporter: ude >Priority: Major > Attachments: image-2024-04-08-10-21-31-066.png, > image-2024-04-08-10-21-48-417.png, image-2024-04-08-10-30-16-683.png > > > I'm test the "async-profiler" feature in version 1.19, but when I submit a > task in yarn per-job mode, I get an error when I click Create Profiling > Instance on the flink Web UI page. > !image-2024-04-08-10-21-31-066.png! > !image-2024-04-08-10-21-48-417.png! > The error message obviously means that the yarn proxy server does not support > *POST* calls. I checked the code of _*WebAppProxyServlet.java*_ and found > that the *POST* method is indeed not supported, so I changed it to *PUT* > method and the call was successful. > !image-2024-04-08-10-30-16-683.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35039) Create Profiling JobManager/TaskManager Instance failed
ude created FLINK-35039: --- Summary: Create Profiling JobManager/TaskManager Instance failed Key: FLINK-35039 URL: https://issues.apache.org/jira/browse/FLINK-35039 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.19.0 Environment: Hadoop 3.2.2 Flink 1.19 Reporter: ude Attachments: image-2024-04-08-10-21-31-066.png, image-2024-04-08-10-21-48-417.png, image-2024-04-08-10-30-16-683.png I'm test the "async-profiler" feature in version 1.19, but when I submit a task in yarn per-job mode, I get an error when I click Create Profiling Instance on the flink Web UI page. !image-2024-04-08-10-21-31-066.png! !image-2024-04-08-10-21-48-417.png! The error message obviously means that the yarn proxy server does not support *POST* calls. I checked the code of _*WebAppProxyServlet.java*_ and found that the *POST* method is indeed not supported, so I changed it to *PUT* method and the call was successful. !image-2024-04-08-10-30-16-683.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34966) Support to read snapshot by table partitions in MySQL CDC Source
[ https://issues.apache.org/jira/browse/FLINK-34966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834756#comment-17834756 ] Qingsheng Ren commented on FLINK-34966: --- [~wanghe] Could you provide some context in the description? > Support to read snapshot by table partitions in MySQL CDC Source > > > Key: FLINK-34966 > URL: https://issues.apache.org/jira/browse/FLINK-34966 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: He Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34661) TaskExecutor supports retain partitions after JM crashed.
[ https://issues.apache.org/jira/browse/FLINK-34661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-34661: --- Assignee: Junrui Li > TaskExecutor supports retain partitions after JM crashed. > - > > Key: FLINK-34661 > URL: https://issues.apache.org/jira/browse/FLINK-34661 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33983) Introduce JobEvent and JobEventStore for Batch Job Recovery
[ https://issues.apache.org/jira/browse/FLINK-33983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-33983: --- Assignee: Junrui Li > Introduce JobEvent and JobEventStore for Batch Job Recovery > --- > > Key: FLINK-33983 > URL: https://issues.apache.org/jira/browse/FLINK-33983 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33986) Extend shuffleMaster to support batch snapshot.
[ https://issues.apache.org/jira/browse/FLINK-33986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-33986: --- Assignee: Junrui Li > Extend shuffleMaster to support batch snapshot. > --- > > Key: FLINK-33986 > URL: https://issues.apache.org/jira/browse/FLINK-33986 > Project: Flink > Issue Type: Sub-task >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > > Extend shuffleMaster to support batch snapshot as follows: > # Add method supportsBatchSnapshot to identify whether the shuffle master > supports taking snapshot in batch scenarios > # Add method snapshotState and restoreState to snapshot and restore the > shuffle master's state. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33984) Introduce SupportsBatchSnapshot for operator coordinator
[ https://issues.apache.org/jira/browse/FLINK-33984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-33984. --- Fix Version/s: 1.20.0 Resolution: Done master: 38255652406becbfbcb7cbec557aa5ba9a1ebbb3 558ca75da2fcec875d1e04a8d75a24fd0ad42ccc > Introduce SupportsBatchSnapshot for operator coordinator > > > Key: FLINK-33984 > URL: https://issues.apache.org/jira/browse/FLINK-33984 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35036) Flink CDC Job cancel with savepoint failed
[ https://issues.apache.org/jira/browse/FLINK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834755#comment-17834755 ] Qingsheng Ren commented on FLINK-35036: --- [~fly365] Thanks for reporting the issue! I agree with [~bgeng777]'s idea and maybe you can have a try. And could you rewrite the description in English considering we have contributors around the world? Thanks > Flink CDC Job cancel with savepoint failed > -- > > Key: FLINK-35036 > URL: https://issues.apache.org/jira/browse/FLINK-35036 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: Flink 1.15.2 > Flink CDC 2.4.2 > Oracle 19C > Doris 2.0.3 >Reporter: Fly365 >Priority: Major > Attachments: image-2024-04-07-17-35-23-136.png > > > With the Flink CDC job, I want oracle data to doris, in the snapshot,canel > the Flink CDC Job with savepoint,the job cancel failed. > 使用Flink CDC,将Oracle > 19C的数据表同步到Doris中,在初始化快照阶段,同步了一部分数据但还没有到增量阶段,此时取消CDC任务并保存Flink > Savepoint,取消任务失败;而在任务进入增量阶段后,取消任务并保存savepoint是可以的,请问存量数据同步阶段,为何savepoint失败? > !image-2024-04-07-17-35-23-136.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34945) Support recover shuffle descriptor and partition metrics from tiered storage
[ https://issues.apache.org/jira/browse/FLINK-34945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-34945: --- Assignee: Junrui Li > Support recover shuffle descriptor and partition metrics from tiered storage > > > Key: FLINK-34945 > URL: https://issues.apache.org/jira/browse/FLINK-34945 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834752#comment-17834752 ] Biao Geng commented on FLINK-35035: --- I am not very familiar with adaptive scheduler, maybe others can share more insights. I just want to ask a question to make sure we are on the same page. Do you mean that instead of triggering onNewResourcesAvailable repeatedly once a new slot is found(in your example, 5 new slots so 5 times of rescheduling), you are expecting the JM can discover the 5 new slots at the same time after a configurable period of time and only trigger 1 time of rescheduling? > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33984][runtime] Support batch snapshot for OperatorCoordinator. [flink]
zhuzhurk closed pull request #24415: [FLINK-33984][runtime] Support batch snapshot for OperatorCoordinator. URL: https://github.com/apache/flink/pull/24415 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35009) Change on getTransitivePredecessors breaks connectors
[ https://issues.apache.org/jira/browse/FLINK-35009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834751#comment-17834751 ] Weijie Guo commented on FLINK-35009: This change can be done in a compatible way. But one could argue that {{Transformation}} is not actually a public API(marked as {{@Internal}} indeed). The connector's depend on it somehow shouldn't be guaranteed. BTW: The {{MockTransformation}} in kafka-connector seems unused and can be removed safely. > Change on getTransitivePredecessors breaks connectors > - > > Key: FLINK-35009 > URL: https://issues.apache.org/jira/browse/FLINK-35009 > Project: Flink > Issue Type: Bug > Components: API / Core, Connectors / Kafka >Affects Versions: 1.18.2, 1.20.0, 1.19.1 >Reporter: Martijn Visser >Priority: Blocker > > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile > (default-testCompile) on project flink-connector-kafka: Compilation failure: > Compilation failure: > Error: > /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[214,24] > > org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation > is not abstract and does not override abstract method > getTransitivePredecessorsInternal() in org.apache.flink.api.dag.Transformation > Error: > /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[220,44] > getTransitivePredecessors() in > org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation > cannot override getTransitivePredecessors() in > org.apache.flink.api.dag.Transformation > Error:overridden method is final > {code} > Example: > https://github.com/apache/flink-connector-kafka/actions/runs/8494349338/job/23269406762#step:15:167 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35009) Change on getTransitivePredecessors breaks connectors
[ https://issues.apache.org/jira/browse/FLINK-35009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834751#comment-17834751 ] Weijie Guo edited comment on FLINK-35009 at 4/8/24 2:13 AM: This change can be done in a compatible way. But one could argue that {{Transformation}} is not actually a public API(marked as {{@Internal}} indeed). The connector depend on it somehow shouldn't be guaranteed. BTW: The {{MockTransformation}} in kafka-connector seems unused and can be removed safely. was (Author: weijie guo): This change can be done in a compatible way. But one could argue that {{Transformation}} is not actually a public API(marked as {{@Internal}} indeed). The connector's depend on it somehow shouldn't be guaranteed. BTW: The {{MockTransformation}} in kafka-connector seems unused and can be removed safely. > Change on getTransitivePredecessors breaks connectors > - > > Key: FLINK-35009 > URL: https://issues.apache.org/jira/browse/FLINK-35009 > Project: Flink > Issue Type: Bug > Components: API / Core, Connectors / Kafka >Affects Versions: 1.18.2, 1.20.0, 1.19.1 >Reporter: Martijn Visser >Priority: Blocker > > {code:java} > Error: Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile > (default-testCompile) on project flink-connector-kafka: Compilation failure: > Compilation failure: > Error: > /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[214,24] > > org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation > is not abstract and does not override abstract method > getTransitivePredecessorsInternal() in org.apache.flink.api.dag.Transformation > Error: > /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[220,44] > getTransitivePredecessors() in > org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation > cannot override getTransitivePredecessors() in > org.apache.flink.api.dag.Transformation > Error:overridden method is final > {code} > Example: > https://github.com/apache/flink-connector-kafka/actions/runs/8494349338/job/23269406762#step:15:167 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0
[ https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834750#comment-17834750 ] Zhongqiang Gong commented on FLINK-34955: - Hi [~slfan1989] ,I apologize for the ambiguity. `{color:#c1c7d0}remove commons-codec dependence{color}` means `{color:#c1c7d0}we don't have to manually add a dependency to commons-codec.{color}` . > Upgrade commons-compress to 1.26.0 > -- > > Key: FLINK-34955 > URL: https://issues.apache.org/jira/browse/FLINK-34955 > Project: Flink > Issue Type: Improvement >Reporter: Shilun Fan >Assignee: Shilun Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can > refer to the maven link > https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35036) Flink CDC Job cancel with savepoint failed
[ https://issues.apache.org/jira/browse/FLINK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834749#comment-17834749 ] Biao Geng commented on FLINK-35036: --- Hi [~fly365], according to the attached screenshot, the failure is caused by a timeout in flink client side. IIUC, in the full volume phase of a flink cdc job, it needs to process lots of data and typically due to the back pressure, the state may be much larger than the incremental phase(you can check the state size in flink's web ui). As a result, it would take longer time for the flink to complete the savepoint. The client's [default timeout is 60s|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#client-timeout], so maybe you can increase the value to see if the savepoint can succeed. > Flink CDC Job cancel with savepoint failed > -- > > Key: FLINK-35036 > URL: https://issues.apache.org/jira/browse/FLINK-35036 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: Flink 1.15.2 > Flink CDC 2.4.2 > Oracle 19C > Doris 2.0.3 >Reporter: Fly365 >Priority: Major > Attachments: image-2024-04-07-17-35-23-136.png > > > With the Flink CDC job, I want oracle data to doris, in the snapshot,canel > the Flink CDC Job with savepoint,the job cancel failed. > 使用Flink CDC,将Oracle > 19C的数据表同步到Doris中,在初始化快照阶段,同步了一部分数据但还没有到增量阶段,此时取消CDC任务并保存Flink > Savepoint,取消任务失败;而在任务进入增量阶段后,取消任务并保存savepoint是可以的,请问存量数据同步阶段,为何savepoint失败? > !image-2024-04-07-17-35-23-136.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34573) the task is stuck on the high presure
[ https://issues.apache.org/jira/browse/FLINK-34573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-34573. -- Resolution: Won't Fix > the task is stuck on the high presure > - > > Key: FLINK-34573 > URL: https://issues.apache.org/jira/browse/FLINK-34573 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.14.3 >Reporter: LSZ >Priority: Blocker > Attachments: rate.PNG, stuck.PNG, tm-thread-dump-chk-0123[1].json, > tm-thread-dump-no-lock-0123[1].json > > Original Estimate: 120h > Remaining Estimate: 120h > > we havae a flink job , jst one taskmanger; > when use high presure as soure data,it will be stuck. sometimes it will be > run 1d ,somtimes it will be run 30min. > !stuck.PNG! > like this: (13:30 the taskmanager reboot,then run 30min, result is stuck ) > test 3 cases: > 1: low presure (1200eps ), it will run 30 min or 1d 。 > 2: close checkpoint , it will run 3d , high presure (1800eps) ,did not run > stuck。 > 3:double the orignal managermemory, it still stuck, jst The appearance time > has been changed to 3 days from 30mins. > !rate.PNG! > > the threads dump info ,when high presure , cpu 90%~100%: > [^tm-thread-dump-chk-0123[1].json] > this is the normal info, when the low presure : > [^tm-thread-dump-no-lock-0123[1].json] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Correct the option key for sortPartition's java doc [flink]
reswqa merged PR #24627: URL: https://github.com/apache/flink/pull/24627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35006) Use try with-resource for StandaloneAutoscalerExecutor
[ https://issues.apache.org/jira/browse/FLINK-35006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirill Plugatarev closed FLINK-35006. - Resolution: Not A Problem > Use try with-resource for StandaloneAutoscalerExecutor > -- > > Key: FLINK-35006 > URL: https://issues.apache.org/jira/browse/FLINK-35006 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Kirill Plugatarev >Assignee: Kirill Plugatarev >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35006] Use try with-resource for StandaloneAutoscalerExecutor [flink-kubernetes-operator]
plugatarev closed pull request #811: [FLINK-35006] Use try with-resource for StandaloneAutoscalerExecutor URL: https://github.com/apache/flink-kubernetes-operator/pull/811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35006] Use try with-resource for StandaloneAutoscalerExecutor [flink-kubernetes-operator]
plugatarev commented on code in PR #811: URL: https://github.com/apache/flink-kubernetes-operator/pull/811#discussion_r1555135563 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java: ## @@ -60,9 +60,11 @@ public static > void main(String[ var autoScaler = createJobAutoscaler(eventHandler, stateStore); -var autoscalerExecutor = -new StandaloneAutoscalerExecutor<>(conf, jobListFetcher, eventHandler, autoScaler); -autoscalerExecutor.start(); +try (var autoscalerExecutor = +new StandaloneAutoscalerExecutor<>( +conf, jobListFetcher, eventHandler, autoScaler)) { +autoscalerExecutor.start(); +} Review Comment: Yes, you are right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35038) Bump test dependency org.yaml:snakeyaml to 2.2
Ufuk Celebi created FLINK-35038: --- Summary: Bump test dependency org.yaml:snakeyaml to 2.2 Key: FLINK-35038 URL: https://issues.apache.org/jira/browse/FLINK-35038 Project: Flink Issue Type: Technical Debt Components: Connectors / Kafka Affects Versions: 3.1.0 Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 3.1.0 Usage of SnakeYAML via {{flink-shaded}} was replaced by an explicit test scope dependency on {{org.yaml:snakeyaml:1.31}} with FLINK-34193. This outdated version of SnakeYAML triggers security warnings. These should not be an actual issue given the test scope, but we should consider bumping the version for security hygiene purposes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2041527525 @LadyForest Hi, CI passed now, Looking forward your review, Thanks very much :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34898) Cannot create ARRAY of named STRUCTs
[ https://issues.apache.org/jira/browse/FLINK-34898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834685#comment-17834685 ] Feng Jin commented on FLINK-34898: -- [~chloehe] Thank you for the update, sorry for the late reply. Based on my testing, this seems to be an issue with the CAST function implementation. The specific reason may be related to this Jira ticket. https://issues.apache.org/jira/browse/FLINK-18673 FLINK-18673 added support for ROW() as a parameter for UDFs. During the validation of SqlCall, the Validate function's Operand Node will be skipped. +org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator#validateColumnListParams+ {code:java} //代码占位符 @Override public void validateColumnListParams( SqlFunction function, List argTypes, List operands) { // we don't support column lists and translate them into the unknown type in the type // factory, // this makes it possible to ignore them in the validator and fall back to regular row types // see also SqlFunction#deriveType } {code} But SqlFunction::deriveType will indirectly call org.apache.calcite.sql.validate.SqlValidator#deriveType to infer the type of Row(). !截屏2024-04-07 22.05.40.png! In the SqlCastFunction, it will directly obtaining ValidatedNode type. {code:java} //代码占位符 @Override public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) { final SqlNode left = callBinding.operand(0); final SqlNode right = callBinding.operand(1); if (SqlUtil.isNullLiteral(left, false) || left instanceof SqlDynamicParam) { return true; } RelDataType validatedNodeType = callBinding.getValidator().getValidatedNodeType(left); //RelDataType validatedNodeType = SqlTypeUtil.deriveType(callBinding, left); RelDataType returnType = SqlTypeUtil.deriveType(callBinding, right); if (!canCastFrom(returnType, validatedNodeType)) { if (throwOnFailure) { throw callBinding.newError( RESOURCE.cannotCastValue( validatedNodeType.toString(), returnType.toString())); } return false; } {code} And the corresponding error message is as follows: {code:java} //代码占位符 .SqlBasicCall: ROW(1, 2) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) at org.apache.flink.table.examples.java.basics.WordCountSQLExample.main(WordCountSQLExample.java:49) Caused by: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlBasicCall: ROW(1, 2) at org.apache.calcite.util.Util.needToImplement(Util.java:1101) at org.apache.calcite.sql.validate.SqlValidatorImpl.getValidatedNodeType(SqlValidatorImpl.java:1777) at org.apache.calcite.sql.fun.SqlCastFunction.checkOperandTypes(SqlCastFunction.java:138) {code} If we use SqlTypeUtil.deriveType, we can get the correct type. Therefore, I think we can modify this section to fix the issue. {code:java} //代码占位符 RelDataType validatedNodeType = SqlTypeUtil.deriveType(callBinding, left); {code} > Cannot create ARRAY of named STRUCTs > > > Key: FLINK-34898 > URL: https://issues.apache.org/jira/browse/FLINK-34898 > Project: Flink > Issue Type: Bug >Affects Versions: 1.19.0 >Reporter: Chloe He >Priority: Major > Attachments: image-2024-03-21-12-00-00-183.png, 截屏2024-04-07 > 22.05.40.png > > > I want to construct data that consists of arrays of named STRUCT. For > example, one field may look like `[\{"a": 1}]`. I am able to construct this > named STRUCT as > {code:java} > SELECT CAST(ROW(1) as ROW) AS row1; {code} > but when I try to wrap this in an ARRAY, it fails: > {code:java} > SELECT ARRAY[CAST(ROW(1) as ROW)] AS row1; > // error > Caused by: java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.SqlBasicCall: ROW(1) > {code} > These are the workarounds that I found: > {code:java} > SELECT ROW(ROW(CAST(ROW(1) as ROW))) AS row1; > // or > SELECT cast(ARRAY[ROW(1)] as ARRAY>); {code} > but I think this is a bug that we need to follow up and fix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34955) Upgrade commons-compress to 1.26.0
[ https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834682#comment-17834682 ] Shilun Fan edited comment on FLINK-34955 at 4/7/24 2:17 PM: [~gongzhongqiang] Of course, if upgrading is possible, it would be a positive step forward. I think we should give it a try. I see that you have created the relevant JIRA ticket, so you can go ahead and attempt it. Hopefully, it will be successful. (However, my preference would be to stick with version 1.26 for now, and consider upgrading to 1.26.1 in the future. If other components of Flink need upgrading, I think it would be best to upgrade them to version 1.26 as well. Removing dependencies, in my opinion, is not a good option.) was (Author: slfan1989): [~gongzhongqiang] Of course, if upgrading is possible, it would be a positive step forward. I think we should give it a try. I see that you have created the relevant JIRA ticket, so you can go ahead and attempt it. Hopefully, it will be successful. > Upgrade commons-compress to 1.26.0 > -- > > Key: FLINK-34955 > URL: https://issues.apache.org/jira/browse/FLINK-34955 > Project: Flink > Issue Type: Improvement >Reporter: Shilun Fan >Assignee: Shilun Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can > refer to the maven link > https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34898) Cannot create ARRAY of named STRUCTs
[ https://issues.apache.org/jira/browse/FLINK-34898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jin updated FLINK-34898: - Attachment: 截屏2024-04-07 22.05.40.png > Cannot create ARRAY of named STRUCTs > > > Key: FLINK-34898 > URL: https://issues.apache.org/jira/browse/FLINK-34898 > Project: Flink > Issue Type: Bug >Affects Versions: 1.19.0 >Reporter: Chloe He >Priority: Major > Attachments: image-2024-03-21-12-00-00-183.png, 截屏2024-04-07 > 22.05.40.png > > > I want to construct data that consists of arrays of named STRUCT. For > example, one field may look like `[\{"a": 1}]`. I am able to construct this > named STRUCT as > {code:java} > SELECT CAST(ROW(1) as ROW) AS row1; {code} > but when I try to wrap this in an ARRAY, it fails: > {code:java} > SELECT ARRAY[CAST(ROW(1) as ROW)] AS row1; > // error > Caused by: java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.SqlBasicCall: ROW(1) > {code} > These are the workarounds that I found: > {code:java} > SELECT ROW(ROW(CAST(ROW(1) as ROW))) AS row1; > // or > SELECT cast(ARRAY[ROW(1)] as ARRAY>); {code} > but I think this is a bug that we need to follow up and fix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0
[ https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834682#comment-17834682 ] Shilun Fan commented on FLINK-34955: [~gongzhongqiang] Of course, if upgrading is possible, it would be a positive step forward. I think we should give it a try. I see that you have created the relevant JIRA ticket, so you can go ahead and attempt it. Hopefully, it will be successful. > Upgrade commons-compress to 1.26.0 > -- > > Key: FLINK-34955 > URL: https://issues.apache.org/jira/browse/FLINK-34955 > Project: Flink > Issue Type: Improvement >Reporter: Shilun Fan >Assignee: Shilun Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can > refer to the maven link > https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0
[ https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834681#comment-17834681 ] Shilun Fan commented on FLINK-34955: [~gongzhongqiang] From my personal perspective, I believe upgrading to version 1.26.0 should be sufficient as this version has already fixed the CVE issue. As for upgrading to 1.26.1, I think we can consider it after some time. Removing commons-codec might prove to be challenging because Flink has dependencies on Hadoop and HBase (both of which directly depend on commons-codec). If we remove commons-codec, it may result in the Hadoop and HBase modules being unable to compile successfully. > Upgrade commons-compress to 1.26.0 > -- > > Key: FLINK-34955 > URL: https://issues.apache.org/jira/browse/FLINK-34955 > Project: Flink > Issue Type: Improvement >Reporter: Shilun Fan >Assignee: Shilun Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can > refer to the maven link > https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35010][connectors/mongodb] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector [flink-connector-mongodb]
GOODBOY008 commented on PR #32: URL: https://github.com/apache/flink-connector-mongodb/pull/32#issuecomment-2041478551 @Jiabao-Sun PR updated and version to `1.26.1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0
[ https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834678#comment-17834678 ] Zhongqiang Gong commented on FLINK-34955: - [~slfan1989] [~mbalassi] According to https://issues.apache.org/jira/browse/COMPRESS-659 , [~jiabaosun] and I think it's better bump version to 1.26.1 and remove `commons-codec` dependence. > Upgrade commons-compress to 1.26.0 > -- > > Key: FLINK-34955 > URL: https://issues.apache.org/jira/browse/FLINK-34955 > Project: Flink > Issue Type: Improvement >Reporter: Shilun Fan >Assignee: Shilun Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can > refer to the maven link > https://mvnrepository.com/artifact/org.apache.commons/commons-compress -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
Zakelly commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1554965456 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest { + +/** The type of processing request. */ +public enum RequestType { +/** Process one record without state access. */ +SYNC, +/** Get from one {@link State}. */ +GET, +/** Put to one {@link State}. */ +PUT, +/** Merge value to an exist key in {@link State}. Mainly used for listState. */ +MERGE, +/** Delete from one {@link State}. */ +DELETE +} + +/** The underlying state to be accessed, can be empty for {@link RequestType#SYNC}. */ +@Nullable private final State state; + +/** The type of this request. */ +private final RequestType type; + +/** The payload(input) of this request. */ +@Nullable private final IN payload; + +/** The future to collect the result of the request. */ +private InternalStateFuture stateFuture; + +/** The record context of this request. */ +private RecordContext context; + +StateRequest(@Nullable State state, RequestType type, @Nullable IN payload) { +this.state = state; +this.type = type; +this.payload = payload; +} + +RequestType getRequestType() { +return type; +} + +@Nullable +IN getPayload() { +return payload; +} + +@Nullable +State getState() { +return state; +} + +InternalStateFuture getFuture() { +return stateFuture; +} + +void setFuture(InternalStateFuture future) { +stateFuture = future; +} + +RecordContext getRecordContext() { +return context; +} + +void setRecordContext(RecordContext context) { +this.context = context; +} Review Comment: @ljz2051 The reason behind this is that the `RecordContext` is managed by the `AEC` and related components, while the `StateRequest` is created in direct implementation of State API (`internal key-value layer`), which lacks the knowledge of `RecordContext`. I tried to avoid introducing `getRecordContext` in `AEC` and call this in every implementation of the state interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]
ljz2051 commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1554960979 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest { + +/** The type of processing request. */ +public enum RequestType { +/** Process one record without state access. */ +SYNC, +/** Get from one {@link State}. */ +GET, +/** Put to one {@link State}. */ +PUT, +/** Merge value to an exist key in {@link State}. Mainly used for listState. */ +MERGE, +/** Delete from one {@link State}. */ +DELETE +} + +/** The underlying state to be accessed, can be empty for {@link RequestType#SYNC}. */ +@Nullable private final State state; + +/** The type of this request. */ +private final RequestType type; + +/** The payload(input) of this request. */ +@Nullable private final IN payload; + +/** The future to collect the result of the request. */ +private InternalStateFuture stateFuture; + +/** The record context of this request. */ +private RecordContext context; + +StateRequest(@Nullable State state, RequestType type, @Nullable IN payload) { +this.state = state; +this.type = type; +this.payload = payload; +} + +RequestType getRequestType() { +return type; +} + +@Nullable +IN getPayload() { +return payload; +} + +@Nullable +State getState() { +return state; +} + +InternalStateFuture getFuture() { +return stateFuture; +} + +void setFuture(InternalStateFuture future) { +stateFuture = future; +} + +RecordContext getRecordContext() { +return context; +} + +void setRecordContext(RecordContext context) { +this.context = context; +} Review Comment: The key in RecordContext is essential for stateRequest. Without setting the RecordContext, the stateRequest is essentially "incomplete". Why don't we consider making RecordContext a final constructor variable for stateRequest? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER
[ https://issues.apache.org/jira/browse/FLINK-35037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yisha zhou updated FLINK-35037: --- Description: In current Implementation, relNodes with Window type will only deliver upsert/unique keys of their inputs. However windows with ROW_NUMBER can also produce upsert/unique keys. For example: {code:java} select id, name, score, age, class, row_number() over(partition by class order by name) as rn, rank() over (partition by class order by score) as rk, dense_rank() over (partition by class order by score) as drk, avg(score) over (partition by class order by score) as avg_score, max(score) over (partition by age) as max_score, count(id) over (partition by age) as cnt from student {code} (class, rn) is a valid uniqueKeys/upsertKeys candidate. was: In current Implementation, relNodes with Window type will only deliver upsert/unique keys of their inputs if these keys contains the partition keys. However windows with ROW_NUMBER can also produce upsert/unique keys. For example: {code:java} select id, name, score, age, class, row_number() over(partition by class order by name) as rn, rank() over (partition by class order by score) as rk, dense_rank() over (partition by class order by score) as drk, avg(score) over (partition by class order by score) as avg_score, max(score) over (partition by age) as max_score, count(id) over (partition by age) as cnt from student {code} (class, rn) is a valid uniqueKeys candidate. > Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER > --- > > Key: FLINK-35037 > URL: https://issues.apache.org/jira/browse/FLINK-35037 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: yisha zhou >Priority: Major > > In current Implementation, relNodes with Window type will only deliver > upsert/unique keys of their inputs. > However windows with ROW_NUMBER can also produce upsert/unique keys. > For example: > {code:java} > select id, name, score, age, class, > row_number() over(partition by class order by name) as rn, > rank() over (partition by class order by score) as rk, > dense_rank() over (partition by class order by score) as drk, > avg(score) over (partition by class order by score) as avg_score, > max(score) over (partition by age) as max_score, > count(id) over (partition by age) as cnt > from student {code} > (class, rn) is a valid uniqueKeys/upsertKeys candidate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER
[ https://issues.apache.org/jira/browse/FLINK-35037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yisha zhou updated FLINK-35037: --- Description: In current Implementation, relNodes with Window type will only deliver upsert/unique keys of their inputs. However windows with ROW_NUMBER can also produce upsert/unique keys. For example: {code:java} select id, name, score, age, class, row_number() over(partition by class order by name) as rn, rank() over (partition by class order by score) as rk, dense_rank() over (partition by class order by score) as drk, avg(score) over (partition by class order by score) as avg_score, max(score) over (partition by age) as max_score, count(id) over (partition by age) as cnt from student {code} (class, rn) is a valid upsert/unique keys candidate. was: In current Implementation, relNodes with Window type will only deliver upsert/unique keys of their inputs. However windows with ROW_NUMBER can also produce upsert/unique keys. For example: {code:java} select id, name, score, age, class, row_number() over(partition by class order by name) as rn, rank() over (partition by class order by score) as rk, dense_rank() over (partition by class order by score) as drk, avg(score) over (partition by class order by score) as avg_score, max(score) over (partition by age) as max_score, count(id) over (partition by age) as cnt from student {code} (class, rn) is a valid uniqueKeys/upsertKeys candidate. > Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER > --- > > Key: FLINK-35037 > URL: https://issues.apache.org/jira/browse/FLINK-35037 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: yisha zhou >Priority: Major > > In current Implementation, relNodes with Window type will only deliver > upsert/unique keys of their inputs. > However windows with ROW_NUMBER can also produce upsert/unique keys. > For example: > {code:java} > select id, name, score, age, class, > row_number() over(partition by class order by name) as rn, > rank() over (partition by class order by score) as rk, > dense_rank() over (partition by class order by score) as drk, > avg(score) over (partition by class order by score) as avg_score, > max(score) over (partition by age) as max_score, > count(id) over (partition by age) as cnt > from student {code} > (class, rn) is a valid upsert/unique keys candidate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER
yisha zhou created FLINK-35037: -- Summary: Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER Key: FLINK-35037 URL: https://issues.apache.org/jira/browse/FLINK-35037 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.20.0 Reporter: yisha zhou In current Implementation, relNodes with Window type will only deliver upsert/unique keys of their inputs if these keys contains the partition keys. However windows with ROW_NUMBER can also produce upsert/unique keys. For example: {code:java} select id, name, score, age, class, row_number() over(partition by class order by name) as rn, rank() over (partition by class order by score) as rk, dense_rank() over (partition by class order by score) as drk, avg(score) over (partition by class order by score) as avg_score, max(score) over (partition by age) as max_score, count(id) over (partition by age) as cnt from student {code} (class, rn) is a valid uniqueKeys candidate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]
fredia commented on PR #22973: URL: https://github.com/apache/flink/pull/22973#issuecomment-2041450107 @Zakelly I have rebased this PR, could you please take a look if you're free? thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]
PatrickRen commented on code in PR #3194: URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1554892220 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java: ## @@ -20,18 +20,33 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; import java.nio.file.Path; import java.util.List; /** Utilities for handling Flink configuration and environment. */ public class FlinkEnvironmentUtils { +private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvironmentUtils.class); private static final String FLINK_CONF_DIR = "conf"; -private static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml"; +private static final String FLINK_CONF_FILENAME = "config.yaml"; public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception { Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME); -return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); +try { +return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath); Review Comment: What about renaming the method to `ConfigurationUtils#loadConfigFile`? This method doesn't process map-formatted config (`flink-conf.yaml`) only. ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java: ## @@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep } ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); try { -Map configMap = +Map configMap = mapper.readValue( -configPath.toFile(), new TypeReference>() {}); -return Configuration.fromMap(configMap); +configPath.toFile(), new TypeReference>() {}); +return Configuration.fromMap(flattenConfigMap(configMap)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to load config file \"%s\" to key-value pairs", configPath), e); } } + +private static Map flattenConfigMap(Map configMap) { +Map result = new HashMap<>(); +flattenConfigMapHelper(configMap, "", result); +return result; +} + +private static void flattenConfigMapHelper( +Map configMap, String currentPath, Map result) { +for (Map.Entry entry : configMap.entrySet()) { +String updatedPath = +currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey(); +if (entry.getValue() instanceof Map) { +flattenConfigMapHelper((Map) entry.getValue(), updatedPath, result); +} else { Review Comment: There should be a case for handling `List` type, according to https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java: ## @@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path configPath) throws Excep } ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); try { -Map configMap = +Map configMap = mapper.readValue( -configPath.toFile(), new TypeReference>() {}); -return Configuration.fromMap(configMap); +configPath.toFile(), new TypeReference>() {}); +return Configuration.fromMap(flattenConfigMap(configMap)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to load config file \"%s\" to key-value pairs", configPath), e); } } + +private static Map flattenConfigMap(Map configMap) { +Map result = new HashMap<>(); +flattenConfigMapHelper(configMap, "", result); +return result; +} Review Comment: This looks like just a very simple wrapper around `flattenConfigMapHelper`. We can just merge the logic into `flattenConfigMap` instead of having another helper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2041411609 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions [flink]
Zakelly commented on PR #24629: URL: https://github.com/apache/flink/pull/24629#issuecomment-2041409660 @XComp Would you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
gong commented on PR #3207: URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2041406982 > I wonder if changing `DorisEventSerializer.DATE_TIME_FORMATTER` to `-MM-dd HH:mm:ss.SS` fixes this problem, too? @yuxiqian I think that it can fix this problem, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34915) Complete `DESCRIBE CATALOG` syntax
[ https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-34915: - Attachment: image-2024-04-07-17-54-51-203.png Description: Describe the metadata of an existing catalog. The metadata information includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is specified, catalog properties are also returned. NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it is not actually available. we can complete the syntax in this FLIP. !image-2024-04-07-17-54-51-203.png|width=545,height=332! was: Describe the metadata of an existing catalog. The metadata information includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is specified, catalog properties are also returned. NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it is not actually available. we can complete the syntax in this FLIP. !image-2024-03-22-18-29-00-454.png|width=561,height=374! > Complete `DESCRIBE CATALOG` syntax > -- > > Key: FLINK-34915 > URL: https://issues.apache.org/jira/browse/FLINK-34915 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Labels: pull-request-available > Attachments: image-2024-03-22-18-29-00-454.png, > image-2024-04-07-17-54-51-203.png > > > Describe the metadata of an existing catalog. The metadata information > includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} > option is specified, catalog properties are also returned. > NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and > it is not actually available. we can complete the syntax in this FLIP. > !image-2024-04-07-17-54-51-203.png|width=545,height=332! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
flinkbot commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2041394595 ## CI report: * a1146244ce8a490c2ca1557f4c2410b59effb333 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34915) Complete `DESCRIBE CATALOG` syntax
[ https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34915: --- Labels: pull-request-available (was: ) > Complete `DESCRIBE CATALOG` syntax > -- > > Key: FLINK-34915 > URL: https://issues.apache.org/jira/browse/FLINK-34915 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Labels: pull-request-available > Attachments: image-2024-03-22-18-29-00-454.png > > > Describe the metadata of an existing catalog. The metadata information > includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} > option is specified, catalog properties are also returned. > NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and > it is not actually available. we can complete the syntax in this FLIP. > !image-2024-03-22-18-29-00-454.png|width=561,height=374! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 opened a new pull request, #24630: URL: https://github.com/apache/flink/pull/24630 ## What is the purpose of the change Describe the metadata of an existing catalog. The metadata information includes the catalog’s name, type, and comment. If the optional EXTENDED option is specified, catalog properties are also returned. ## Brief change log * { DESCRIBE | DESC } CATALOG [EXTENDED] catalog_name ## Verifying this change This change added tests and can be verified as follows: flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34952][cdc-composer][sink] Flink CDC pipeline supports SinkFunction [flink-cdc]
loserwang1024 commented on code in PR #3204: URL: https://github.com/apache/flink-cdc/pull/3204#discussion_r1554863395 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java: ## @@ -35,4 +35,10 @@ public class ValuesDataSinkOptions { .booleanType() .defaultValue(true) .withDescription("True if the Event should be print to console."); + +public static final ConfigOption SINK_TYPE = +ConfigOptions.key("sinkType") Review Comment: It sounds better. Just do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fix YARN ContainerId.getId Deprecated Used. [flink]
slfan1989 commented on PR #24601: URL: https://github.com/apache/flink/pull/24601#issuecomment-2041389025 @1996fanrui Thank you very much for reviewing the code! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35036) Flink CDC Job cancel with savepoint failed
Fly365 created FLINK-35036: -- Summary: Flink CDC Job cancel with savepoint failed Key: FLINK-35036 URL: https://issues.apache.org/jira/browse/FLINK-35036 Project: Flink Issue Type: Bug Components: Flink CDC Environment: Flink 1.15.2 Flink CDC 2.4.2 Oracle 19C Doris 2.0.3 Reporter: Fly365 Attachments: image-2024-04-07-17-35-23-136.png With the Flink CDC job, I want oracle data to doris, in the snapshot,canel the Flink CDC Job with savepoint,the job cancel failed. 使用Flink CDC,将Oracle 19C的数据表同步到Doris中,在初始化快照阶段,同步了一部分数据但还没有到增量阶段,此时取消CDC任务并保存Flink Savepoint,取消任务失败;而在任务进入增量阶段后,取消任务并保存savepoint是可以的,请问存量数据同步阶段,为何savepoint失败? !image-2024-04-07-17-35-23-136.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [cdc-connector][cdc-base] Shade guava31 to avoid dependency conflict with flink below 1.18 [flink-cdc]
PatrickRen commented on code in PR #3083: URL: https://github.com/apache/flink-cdc/pull/3083#discussion_r1554857536 ## pom.xml: ## @@ -462,8 +462,15 @@ under the License. submodules, ${flink.version} will be resolved as the actual Flink version. --> org.apache.flink:flink-shaded-force-shading + org.apache.flink:flink-shaded-guava + + +flink.shaded.guava Review Comment: Thanks for the update! You are correct. Depending on Guava directly is prohibited in checkstyle to avoid using non-relocated guava by mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35023) YARNApplicationITCase failed on Azure
[ https://issues.apache.org/jira/browse/FLINK-35023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834639#comment-17834639 ] Weijie Guo edited comment on FLINK-35023 at 4/7/24 9:24 AM: jdk17: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28322 jdk21: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=59a2b95a-736b-5c46-b3e0-cee6e587fd86=c301da75-e699-5c06-735f-778207c16f50=28612 was (Author: weijie guo): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28322 > YARNApplicationITCase failed on Azure > - > > Key: FLINK-35023 > URL: https://issues.apache.org/jira/browse/FLINK-35023 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > 1. > YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion > {code:java} > Apr 06 02:19:44 02:19:44.063 [ERROR] > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion > -- Time elapsed: 9.727 s <<< FAILURE! > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion$1(YARNApplicationITCase.java:72) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion(YARNApplicationITCase.java:70) > Apr 06 02:19:44 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 06 02:19:44 at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > 2. YARNApplicationITCase.testApplicationClusterWithRemoteUserJar > {code:java} > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithRemoteUserJar$2(YARNApplicationITCase.java:86) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithRemoteUserJar(YARNApplicationITCase.java:84) > Apr 06 02:19:44 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 06 02:19:44 at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > 3. > YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion > {code:java} > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06
[jira] [Commented] (FLINK-35023) YARNApplicationITCase failed on Azure
[ https://issues.apache.org/jira/browse/FLINK-35023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834639#comment-17834639 ] Weijie Guo commented on FLINK-35023: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28322 > YARNApplicationITCase failed on Azure > - > > Key: FLINK-35023 > URL: https://issues.apache.org/jira/browse/FLINK-35023 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > 1. > YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion > {code:java} > Apr 06 02:19:44 02:19:44.063 [ERROR] > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion > -- Time elapsed: 9.727 s <<< FAILURE! > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion$1(YARNApplicationITCase.java:72) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion(YARNApplicationITCase.java:70) > Apr 06 02:19:44 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 06 02:19:44 at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > 2. YARNApplicationITCase.testApplicationClusterWithRemoteUserJar > {code:java} > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithRemoteUserJar$2(YARNApplicationITCase.java:86) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithRemoteUserJar(YARNApplicationITCase.java:84) > Apr 06 02:19:44 at > java.base/java.lang.reflect.Method.invoke(Method.java:568) > Apr 06 02:19:44 at > java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > Apr 06 02:19:44 at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > {code} > 3. > YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion > {code:java} > Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED > while expecting FINISHED > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116) > Apr 06 02:19:44 at > org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion$0(YARNApplicationITCase.java:62) > Apr 06 02:19:44 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Apr 06 02:19:44 at >
Re: [PR] [FLINK-34952][cdc-composer][sink] Flink CDC pipeline supports SinkFunction [flink-cdc]
PatrickRen commented on code in PR #3204: URL: https://github.com/apache/flink-cdc/pull/3204#discussion_r1554855153 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java: ## @@ -35,4 +35,10 @@ public class ValuesDataSinkOptions { .booleanType() .defaultValue(true) .withDescription("True if the Event should be print to console."); + +public static final ConfigOption SINK_TYPE = +ConfigOptions.key("sinkType") Review Comment: What about `sink.api`? We use the word "type" in YAML for specifying the category of connector, such as "mysql", "doris", "values", so I'm concerned about the potential conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34959) Update old flink-cdc-connectors artifactId
[ https://issues.apache.org/jira/browse/FLINK-34959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren resolved FLINK-34959. --- Resolution: Fixed > Update old flink-cdc-connectors artifactId > -- > > Key: FLINK-34959 > URL: https://issues.apache.org/jira/browse/FLINK-34959 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: xleoken >Assignee: xleoken >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34959) Update old flink-cdc-connectors artifactId
[ https://issues.apache.org/jira/browse/FLINK-34959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834638#comment-17834638 ] Qingsheng Ren commented on FLINK-34959: --- cdc-master: 6510e670faa154d619937d97dc3f146eb162fac2 > Update old flink-cdc-connectors artifactId > -- > > Key: FLINK-34959 > URL: https://issues.apache.org/jira/browse/FLINK-34959 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: xleoken >Assignee: xleoken >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34959] Update old flink-cdc-connectors artifactId [flink-cdc]
PatrickRen merged PR #3200: URL: https://github.com/apache/flink-cdc/pull/3200 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34959) Update old flink-cdc-connectors artifactId
[ https://issues.apache.org/jira/browse/FLINK-34959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-34959: - Assignee: xleoken > Update old flink-cdc-connectors artifactId > -- > > Key: FLINK-34959 > URL: https://issues.apache.org/jira/browse/FLINK-34959 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: xleoken >Assignee: xleoken >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834636#comment-17834636 ] xiaogang zhou commented on FLINK-32070: --- [~Zakelly] yes, sounds good, Let me take a look > FLIP-306 Unified File Merging Mechanism for Checkpoints > --- > > Key: FLINK-32070 > URL: https://issues.apache.org/jira/browse/FLINK-32070 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Fix For: 1.20.0 > > > The FLIP: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints] > > The creation of multiple checkpoint files can lead to a 'file flood' problem, > in which a large number of files are written to the checkpoint storage in a > short amount of time. This can cause issues in large clusters with high > workloads, such as the creation and deletion of many files increasing the > amount of file meta modification on DFS, leading to single-machine hotspot > issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the > performance of object storage (e.g. Amazon S3 and Alibaba OSS) can > significantly decrease when listing objects, which is necessary for object > name de-duplication before creating an object, further affecting the > performance of directory manipulation in the file system's perspective of > view (See [hadoop-aws module > documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients], > section 'Warning #2: Directories are mimicked'). > While many solutions have been proposed for individual types of state files > (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel > state), the file flood problems from each type of checkpoint file are similar > and lack systematic view and solution. Therefore, the goal of this FLIP is to > establish a unified file merging mechanism to address the file flood problem > during checkpoint creation for all types of state files, including keyed, > non-keyed, channel, and changelog state. This will significantly improve the > system stability and availability of fault tolerance in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34712) Update reference data for Migration Tests
[ https://issues.apache.org/jira/browse/FLINK-34712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834634#comment-17834634 ] lincoln lee edited comment on FLINK-34712 at 4/7/24 8:53 AM: - fixed in 1.19: 4397c9f1d0f300bd8c0f9de4e59e1a5d7883ec2c fixed in master: 5bbcf8de79ce1979412879b919299ffa5a9b62fe was (Author: lincoln.86xy): fixed in 1.19: 4397c9f1d0f300bd8c0f9de4e59e1a5d7883ec2c > Update reference data for Migration Tests > - > > Key: FLINK-34712 > URL: https://issues.apache.org/jira/browse/FLINK-34712 > Project: Flink > Issue Type: Sub-task >Reporter: Lincoln Lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > Update migration tests in master to cover migration from new version. Since > 1.18, this step could be done automatically with the following steps. For > more information please refer to [this > page.|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/README.md] > # {*}On the published release tag (e.g., release-1.16.0){*}, run > {panel} > {panel} > |{{$ mvn clean }}{{package}} {{{}-Pgenerate-migration-test-data > -Dgenerate.version={}}}{{{}1.16{}}} {{-nsu -Dfast -DskipTests}}| > The version (1.16 in the command above) should be replaced with the target > one. > # Modify the content of the file > [apache/flink:flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version] > to the latest version (it would be "v1_16" if sticking to the example where > 1.16.0 was released). > # Commit the modification in step a and b with "{_}[release] Generate > reference data for state migration tests based on release-1.xx.0{_}" to the > corresponding release branch (e.g. {{release-1.16}} in our example), replace > "xx" with the actual version (in this example "16"). You should use the Jira > issue ID in case of [release] as the commit message's prefix if you have a > dedicated Jira issue for this task. > # Cherry-pick the commit to the master branch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34712) Update reference data for Migration Tests
[ https://issues.apache.org/jira/browse/FLINK-34712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee closed FLINK-34712. --- Resolution: Fixed > Update reference data for Migration Tests > - > > Key: FLINK-34712 > URL: https://issues.apache.org/jira/browse/FLINK-34712 > Project: Flink > Issue Type: Sub-task >Reporter: Lincoln Lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > Update migration tests in master to cover migration from new version. Since > 1.18, this step could be done automatically with the following steps. For > more information please refer to [this > page.|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/README.md] > # {*}On the published release tag (e.g., release-1.16.0){*}, run > {panel} > {panel} > |{{$ mvn clean }}{{package}} {{{}-Pgenerate-migration-test-data > -Dgenerate.version={}}}{{{}1.16{}}} {{-nsu -Dfast -DskipTests}}| > The version (1.16 in the command above) should be replaced with the target > one. > # Modify the content of the file > [apache/flink:flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version] > to the latest version (it would be "v1_16" if sticking to the example where > 1.16.0 was released). > # Commit the modification in step a and b with "{_}[release] Generate > reference data for state migration tests based on release-1.xx.0{_}" to the > corresponding release branch (e.g. {{release-1.16}} in our example), replace > "xx" with the actual version (in this example "16"). You should use the Jira > issue ID in case of [release] as the commit message's prefix if you have a > dedicated Jira issue for this task. > # Cherry-pick the commit to the master branch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34712][release] Generate reference data for state migration tests based on release-1.19.0 [flink]
lincoln-lil merged PR #24594: URL: https://github.com/apache/flink/pull/24594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
yuxiqian commented on PR #3207: URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2041372727 I wonder if changing `DorisEventSerializer.DATE_TIME_FORMATTER` to `-MM-dd HH:mm:ss.SS` would fix this problem, too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
liuyongvs commented on PR #24526: URL: https://github.com/apache/flink/pull/24526#issuecomment-2041370638 hi @MartijnVisser -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34712) Update reference data for Migration Tests
[ https://issues.apache.org/jira/browse/FLINK-34712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834634#comment-17834634 ] lincoln lee commented on FLINK-34712: - fixed in 1.19: 4397c9f1d0f300bd8c0f9de4e59e1a5d7883ec2c > Update reference data for Migration Tests > - > > Key: FLINK-34712 > URL: https://issues.apache.org/jira/browse/FLINK-34712 > Project: Flink > Issue Type: Sub-task >Reporter: Lincoln Lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > Update migration tests in master to cover migration from new version. Since > 1.18, this step could be done automatically with the following steps. For > more information please refer to [this > page.|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/README.md] > # {*}On the published release tag (e.g., release-1.16.0){*}, run > {panel} > {panel} > |{{$ mvn clean }}{{package}} {{{}-Pgenerate-migration-test-data > -Dgenerate.version={}}}{{{}1.16{}}} {{-nsu -Dfast -DskipTests}}| > The version (1.16 in the command above) should be replaced with the target > one. > # Modify the content of the file > [apache/flink:flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version] > to the latest version (it would be "v1_16" if sticking to the example where > 1.16.0 was released). > # Commit the modification in step a and b with "{_}[release] Generate > reference data for state migration tests based on release-1.xx.0{_}" to the > corresponding release branch (e.g. {{release-1.16}} in our example), replace > "xx" with the actual version (in this example "16"). You should use the Jira > issue ID in case of [release] as the commit message's prefix if you have a > dedicated Jira issue for this task. > # Cherry-pick the commit to the master branch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34712][release] Generate reference data for state migration tests based on release-1.19.0 [flink]
lincoln-lil merged PR #24517: URL: https://github.com/apache/flink/pull/24517 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions [flink]
flinkbot commented on PR #24629: URL: https://github.com/apache/flink/pull/24629#issuecomment-2041363992 ## CI report: * 1eeedee02ccdb674b5b2598bec8531e8eac49bf9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions [flink]
Zakelly opened a new pull request, #24629: URL: https://github.com/apache/flink/pull/24629 ## What is the purpose of the change Currently, when re-generating docs on master branch, the `rpc_configuration.html` is generated. The `AkkaOptions` is renamed to `RpcOptions` via FLINK-32684 so docs should be re-generated and all the content including should point to the new one. ## Brief change log - Re-generate docs. `rpc_configuration.html` is generated. - Replace all including of `akka_configuration.html` with `rpc_configuration.html` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]
ruanhang1993 commented on code in PR #3134: URL: https://github.com/apache/flink-cdc/pull/3134#discussion_r1554839652 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java: ## @@ -307,8 +307,17 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize()); } final int requestMetaGroupId = requestEvent.getRequestMetaGroupId(); - -if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) { +final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize(); +final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size(); +if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) { Review Comment: Add logs. ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java: ## @@ -43,10 +43,14 @@ public class BinlogSplitMetaEvent implements SourceEvent { */ private final List metaGroup; -public BinlogSplitMetaEvent(String splitId, int metaGroupId, List metaGroup) { +private final int totalFinishedSplitSize; + +public BinlogSplitMetaEvent( +String splitId, int metaGroupId, List metaGroup, int totalFinishedSplitSize) { Review Comment: Add `@Nullable` ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java: ## @@ -395,11 +396,17 @@ private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) { MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId()); if (binlogSplit != null) { final int receivedMetaGroupId = metadataEvent.getMetaGroupId(); +final int receivedTotalFinishedSplitSize = metadataEvent.getTotalFinishedSplitSize(); final int expectedMetaGroupId = ChunkUtils.getNextMetaGroupId( binlogSplit.getFinishedSnapshotSplitInfos().size(), sourceConfig.getSplitMetaGroupSize()); -if (receivedMetaGroupId == expectedMetaGroupId) { +if (receivedTotalFinishedSplitSize < binlogSplit.getTotalFinishedSplitSize()) { Review Comment: Add some logs here. ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java: ## @@ -298,22 +298,35 @@ private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEven finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize()); } final int requestMetaGroupId = requestEvent.getRequestMetaGroupId(); - -if (binlogSplitMeta.size() > requestMetaGroupId) { +final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize(); +final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size(); +if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) { Review Comment: Add some logs here. ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java: ## @@ -43,10 +43,14 @@ public class StreamSplitMetaEvent implements SourceEvent { */ private final List metaGroup; -public StreamSplitMetaEvent(String splitId, int metaGroupId, List metaGroup) { +private final int totalFinishedSplitSize; + +public StreamSplitMetaEvent( +String splitId, int metaGroupId, List metaGroup, int totalFinishedSplitSize) { Review Comment: Add `@Nullable` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34995) flink kafka connector source stuck when partition leader invalid
[ https://issues.apache.org/jira/browse/FLINK-34995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834629#comment-17834629 ] yansuopeng commented on FLINK-34995: https://github.com/apache/flink-connector-kafka/pull/91 > flink kafka connector source stuck when partition leader invalid > > > Key: FLINK-34995 > URL: https://issues.apache.org/jira/browse/FLINK-34995 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.19.0, 1.18.1 >Reporter: yansuopeng >Priority: Major > Labels: pull-request-available > > when partition leader invalid(leader=-1), the flink streaming job using > KafkaSource can't restart or start a new instance with a new groupid, it > will stuck and got following exception: > "{*}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before the position for partition aaa-1 could be determined{*}" > when leader=-1, kafka api like KafkaConsumer.position() will block until > either the position could be determined or an unrecoverable error is > encountered > infact, leader=-1 not easy to avoid, even replica=3, three disk offline > together will trigger the problem, especially when the cluster size is > relatively large. it rely on kafka administrator to fix in time, but it > take risk when in kafka cluster peak period. > I have solve this problem, and want to create a PR. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34995) flink kafka connector source stuck when partition leader invalid
[ https://issues.apache.org/jira/browse/FLINK-34995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34995: --- Labels: pull-request-available (was: ) > flink kafka connector source stuck when partition leader invalid > > > Key: FLINK-34995 > URL: https://issues.apache.org/jira/browse/FLINK-34995 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.19.0, 1.18.1 >Reporter: yansuopeng >Priority: Major > Labels: pull-request-available > > when partition leader invalid(leader=-1), the flink streaming job using > KafkaSource can't restart or start a new instance with a new groupid, it > will stuck and got following exception: > "{*}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before the position for partition aaa-1 could be determined{*}" > when leader=-1, kafka api like KafkaConsumer.position() will block until > either the position could be determined or an unrecoverable error is > encountered > infact, leader=-1 not easy to avoid, even replica=3, three disk offline > together will trigger the problem, especially when the cluster size is > relatively large. it rely on kafka administrator to fix in time, but it > take risk when in kafka cluster peak period. > I have solve this problem, and want to create a PR. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34995] flink kafka connector source stuck when partition leade… [flink-connector-kafka]
boring-cyborg[bot] commented on PR #91: URL: https://github.com/apache/flink-connector-kafka/pull/91#issuecomment-2041355545 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org