[jira] [Assigned] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2024-04-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-33934:


Assignee: Yuan Kui

> Flink SQL Source use raw format maybe lead to data lost
> ---
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, 
> 1.19.0
>Reporter: Cai Liuyang
>Assignee: Yuan Kui
>Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue (our internal mq) and 
> write to hive (only select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should remove reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555253454


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,
+/** Get from one {@link State}. */
+GET,
+/** Put to one {@link State}. */
+PUT,
+/** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+MERGE,
+/** Delete from one {@link State}. */
+DELETE
+}
+
+/** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+@Nullable private final State state;
+
+/** The type of this request. */
+private final RequestType type;
+
+/** The payload(input) of this request. */
+@Nullable private final IN payload;
+
+/** The future to collect the result of the request. */
+private InternalStateFuture stateFuture;
+
+/** The record context of this request. */
+private RecordContext context;
+
+StateRequest(@Nullable State state, RequestType type, @Nullable IN 
payload) {
+this.state = state;
+this.type = type;
+this.payload = payload;
+}
+
+RequestType getRequestType() {
+return type;
+}
+
+@Nullable
+IN getPayload() {
+return payload;
+}
+
+@Nullable
+State getState() {
+return state;
+}
+
+InternalStateFuture getFuture() {
+return stateFuture;
+}
+
+void setFuture(InternalStateFuture future) {
+stateFuture = future;
+}
+
+RecordContext getRecordContext() {
+return context;
+}
+
+void setRecordContext(RecordContext context) {
+this.context = context;
+}

Review Comment:
   @masteryhx After trying, I think we are still not able to avoid lazy setting 
some contexts or create some temporary instances. How about we make 
`AEC#handleRequest` receive the information from outside and build the 
`StateRequest` internally?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35042) Streaming File Sink s3 end-to-end test failed as TM lost

2024-04-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35042:
---
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=14344

FAIL 'Streaming File Sink s3 end-to-end test' failed after 15 minutes and 20 
seconds! Test exited with exit code 1

I have checked the JM log, it seems that a taskmanager is no longer reachable:

{code:java}
2024-04-08T01:12:04.3922210Z Apr 08 01:12:04 2024-04-08 00:58:15,517 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink: Unnamed 
(4/4) (14b44f534745ffb2f1ef03fca34f7f0d_0a448493b4782967b150582570326227_3_0) 
switched from RUNNING to FAILED on localhost:44987-47f5af @ localhost 
(dataPort=34489).
2024-04-08T01:12:04.3924522Z Apr 08 01:12:04 
org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id 
localhost:44987-47f5af is no longer reachable.
2024-04-08T01:12:04.3925421Z Apr 08 01:12:04at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1511)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3926185Z Apr 08 01:12:04at 
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3926925Z Apr 08 01:12:04at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3929898Z Apr 08 01:12:04at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3930692Z Apr 08 01:12:04at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3931442Z Apr 08 01:12:04at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3931917Z Apr 08 01:12:04at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_402]
2024-04-08T01:12:04.3934759Z Apr 08 01:12:04at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_402]
2024-04-08T01:12:04.3935252Z Apr 08 01:12:04at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_402]
2024-04-08T01:12:04.3935989Z Apr 08 01:12:04at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460)
 ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3936731Z Apr 08 01:12:04at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-dist-1.20-SNAPSHOT.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3938103Z Apr 08 01:12:04at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460)
 ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3942549Z Apr 08 01:12:04at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225)
 ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3945371Z Apr 08 01:12:04at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
 ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3946244Z Apr 08 01:12:04at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
 ~[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3946960Z Apr 08 01:12:04at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3947664Z Apr 08 01:12:04at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3950764Z Apr 08 01:12:04at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3952816Z Apr 08 01:12:04at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akka9681a48a-ca1a-45b0-bb71-4bdb5d2aed93.jar:1.20-SNAPSHOT]
2024-04-08T01:12:04.3953526Z Apr 08 01:12:04at 

[jira] [Created] (FLINK-35042) Streaming File Sink s3 end-to-end test failed as TM lost

2024-04-07 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35042:
--

 Summary: Streaming File Sink s3 end-to-end test failed as TM lost
 Key: FLINK-35042
 URL: https://issues.apache.org/jira/browse/FLINK-35042
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555247481


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,
+/** Get from one {@link State}. */
+GET,
+/** Put to one {@link State}. */
+PUT,
+/** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+MERGE,
+/** Delete from one {@link State}. */
+DELETE
+}
+
+/** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+@Nullable private final State state;
+
+/** The type of this request. */
+private final RequestType type;
+
+/** The payload(input) of this request. */
+@Nullable private final IN payload;
+
+/** The future to collect the result of the request. */
+private InternalStateFuture stateFuture;
+
+/** The record context of this request. */
+private RecordContext context;
+
+StateRequest(@Nullable State state, RequestType type, @Nullable IN 
payload) {
+this.state = state;
+this.type = type;
+this.payload = payload;
+}
+
+RequestType getRequestType() {
+return type;
+}
+
+@Nullable
+IN getPayload() {
+return payload;
+}
+
+@Nullable
+State getState() {
+return state;
+}
+
+InternalStateFuture getFuture() {
+return stateFuture;
+}
+
+void setFuture(InternalStateFuture future) {
+stateFuture = future;
+}
+
+RecordContext getRecordContext() {
+return context;
+}
+
+void setRecordContext(RecordContext context) {
+this.context = context;
+}

Review Comment:
   Well this is a good suggestion. I'll try this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555245552


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Async Execution Controller (AEC) receives processing requests from 
operators, and put them
+ * into execution according to some strategies.
+ *
+ * It is responsible for:
+ * Preserving the sequence of elements bearing the same key by delaying 
subsequent requests
+ * until the processing of preceding ones is finalized.
+ * Tracking the in-flight data(records) and blocking the input if too much 
data in flight
+ * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause 
current operations,
+ * allowing for the execution of callbacks (mails in Mailbox).
+ *
+ * @param  the type of the record
+ * @param  the type of the key
+ */
+public class AsyncExecutionController {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
+
+public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+
+/** The max allow number of in-flight records. */
+private final int maxInFlightRecordNum;
+
+/** The key accounting unit which is used to detect the key conflict. */
+final KeyAccountingUnit keyAccountingUnit;
+
+/**
+ * A factory to build {@link 
org.apache.flink.core.state.InternalStateFuture}, this will auto
+ * wire the created future with mailbox executor. Also conducting the 
context switch.
+ */
+private final StateFutureFactory stateFutureFactory;
+
+/** The state executor where the {@link StateRequest} is actually 
executed. */
+final StateExecutor stateExecutor;
+
+/** The corresponding context that currently runs in task thread. */
+RecordContext currentContext;
+
+public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
+this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+}
+
+public AsyncExecutionController(
+MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int 
maxInFlightRecords) {
+this.keyAccountingUnit = new KeyAccountingUnit<>();
+this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+this.stateExecutor = stateExecutor;
+this.maxInFlightRecordNum = maxInFlightRecords;
+LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", 
maxInFlightRecords);
+}
+
+/**
+ * Build a new context based on record and key. Also wired with internal 
{@link
+ * KeyAccountingUnit}.
+ *
+ * @param record the given record.
+ * @param key the given key.
+ * @return the built record context.
+ */
+public RecordContext buildContext(R record, K key) {

Review Comment:
   Well, building/initializing is a special use case other than simple setting. 
We can change the behavior of this method to something like `buildAndSet`. But 
I slightly tend to make interfaces implement more specialized(single) functions 
rather than multiple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555244839


##
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.StateRequest.RequestType;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link AsyncExecutionController}. */
+class AsyncExecutionControllerTest {
+
+// TODO: this test is not well completed, cause buffering in AEC is not 
implemented.
+// Yet, just for illustrating the interaction between AEC and Async state 
API.
+@Test
+void testBasicRun() {
+TestAsyncExecutionController aec =
+new TestAsyncExecutionController<>(
+new SyncMailboxExecutor(), new TestStateExecutor());
+TestUnderlyingState underlyingState = new TestUnderlyingState();
+TestValueState valueState = new TestValueState(aec, underlyingState);
+AtomicInteger output = new AtomicInteger();
+Runnable userCode =
+() -> {
+valueState
+.asyncValue()
+.thenCompose(
+val -> {
+int updated = (val == null ? 1 : (val 
+ 1));
+return valueState
+.asyncUpdate(updated)
+.thenCompose(
+o ->
+
StateFutureUtils.completedFuture(
+
updated));
+})
+.thenAccept(val -> output.set(val));
+};
+
+//  element1 
+String record1 = "key1-r1";
+String key1 = "key1";
+// Simulate the wrapping in {@link 
RecordProcessorUtils#getRecordProcessor()}, wrapping the
+// record and key with RecordContext.
+RecordContext recordContext1 = 
aec.buildContext(record1, key1);
+aec.setCurrentContext(recordContext1);
+// execute user code
+userCode.run();
+
+// Single-step run.
+// Firstly, the user code generates value get in active buffer.
+assertThat(aec.activeBuffer.size()).isEqualTo(1);
+assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+aec.triggerIfNeeded(true);
+// After running, the value update is in active buffer.
+assertThat(aec.activeBuffer.size()).isEqualTo(1);
+assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+aec.triggerIfNeeded(true);
+// Value update finishes.
+assertThat(aec.activeBuffer.size()).isEqualTo(0);
+assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
+assertThat(output.get()).isEqualTo(1);
+assertThat(recordContext1.getReferenceCount()).isEqualTo(0);
+
+//  element 2 & 3 

+String record2 = "key1-r2";
+String key2 = "key1";
+RecordContext recordContext2 = 
aec.buildContext(record2, key2);
+aec.setCurrentContext(recordContext2);
+// execute user code
+

[jira] [Updated] (FLINK-34273) git fetch fails

2024-04-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-34273:
---
Affects Version/s: 1.20.0

> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35023) YARNApplicationITCase failed on Azure

2024-04-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834783#comment-17834783
 ] 

Weijie Guo commented on FLINK-35023:


jdk17 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28689


> YARNApplicationITCase failed on Azure
> -
>
> Key: FLINK-35023
> URL: https://issues.apache.org/jira/browse/FLINK-35023
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> 1. 
> YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion
> {code:java}
> Apr 06 02:19:44 02:19:44.063 [ERROR] 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion
>  -- Time elapsed: 9.727 s <<< FAILURE!
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion$1(YARNApplicationITCase.java:72)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion(YARNApplicationITCase.java:70)
> Apr 06 02:19:44   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> 2. YARNApplicationITCase.testApplicationClusterWithRemoteUserJar
> {code:java}
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithRemoteUserJar$2(YARNApplicationITCase.java:86)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithRemoteUserJar(YARNApplicationITCase.java:84)
> Apr 06 02:19:44   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> 3. 
> YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion
> {code:java}
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion$0(YARNApplicationITCase.java:62)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   

[jira] [Commented] (FLINK-34273) git fetch fails

2024-04-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834784#comment-17834784
 ] 

Weijie Guo commented on FLINK-34273:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=59a2b95a-736b-5c46-b3e0-cee6e587fd86=c5b19363-f3ba-59e0-bfbf-73c4b9bde45c=264

> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=5d6dc3d3-393d-5111-3a40-c6a5a36202e6=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35041:
---
Description: 
{code:java}
Apr 08 03:22:45 03:22:45.450 [ERROR] 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
 -- Time elapsed: 0.034 s <<< FAILURE!
Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
Apr 08 03:22:45 
Apr 08 03:22:45 expected: false
Apr 08 03:22:45  but was: true
Apr 08 03:22:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Apr 08 03:22:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Apr 08 03:22:45 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Apr 08 03:22:45 at 
org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
Apr 08 03:22:45 at 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498)
Apr 08 03:22:45 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Apr 08 03:22:45 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

{code}


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-07 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35041:
--

 Summary: 
IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
 Key: FLINK-35041
 URL: https://issues.apache.org/jira/browse/FLINK-35041
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555241719


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Key accounting unit holds the current in-flight key and tracks the 
corresponding ongoing records,
+ * which is used to preserve the ordering of independent chained {@link
+ * org.apache.flink.api.common.state.v2.StateFuture}.
+ *
+ * @param  the type of record
+ * @param  the type of key
+ */
+public class KeyAccountingUnit {

Review Comment:
   I'm not a big fan of interface abstraction. Two questions: Is there a need 
for two or more behaviors? And how could we/user choose one of them? It is just 
a class for internal usage so I'd keep this normal class until we meet a 
concrete use case for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555239594


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Executor for executing batch {@link StateRequest}s. */
+@Internal
+public interface StateExecutor {
+/**
+ * Execute a batch of state requests.
+ *
+ * @param processingRequests the given batch of processing requests
+ * @return A future can determine whether execution has completed.
+ */
+CompletableFuture executeBatchRequests(
+Iterable> processingRequests);

Review Comment:
   The sorting and batching in group is finished by the `AEC`, there is no 
mixed types of requests here.
   Here we just follow the description of the FLIPs, if we need to change the 
signature here, it would be fine change this later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555237399


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,

Review Comment:
   It actually means sync with the framework to check if there is another 
record on-going with a same key. It is something like `SYNC_POINT`. I'll change 
the doc here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-04-07 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834779#comment-17834779
 ] 

Jiabao Sun commented on FLINK-34955:


I have rechecked the dependency of `commons-codec` in `commons-compress` and it 
is no longer optional. Even if upgraded to 1.26.1, `commons-codec` will still 
be a transitive dependency. 
Sorry for the disturbance.

> Upgrade commons-compress to 1.26.0
> --
>
> Key: FLINK-34955
> URL: https://issues.apache.org/jira/browse/FLINK-34955
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shilun Fan
>Assignee: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
> refer to the maven link
> https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35010) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector

2024-04-07 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834778#comment-17834778
 ] 

Jiabao Sun commented on FLINK-35010:


I have rechecked the dependency of `commons-codec` in `commons-compress` and it 
is no longer optional. 
Even if upgraded to 1.26.1, `commons-codec` will still be a transitive 
dependency. 
Please ignore the previous noise, sorry for the disturbance.

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink 
> Mongodb connector
> --
>
> Key: FLINK-35010
> URL: https://issues.apache.org/jira/browse/FLINK-35010
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / MongoDB
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink Kafka connector

2024-04-07 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834777#comment-17834777
 ] 

Jiabao Sun commented on FLINK-35008:


I have rechecked the dependency of `commons-codec` in `commons-compress` and it 
is no longer optional. 
Even if upgraded to 1.26.1, `commons-codec` will still be a transitive 
dependency. 
Please ignore the previous noise, sorry for the disturbance.

> Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink 
> Kafka connector
> 
>
> Key: FLINK-35008
> URL: https://issues.apache.org/jira/browse/FLINK-35008
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]

2024-04-07 Thread via GitHub


Jiabao-Sun commented on code in PR #24580:
URL: https://github.com/apache/flink/pull/24580#discussion_r1555174874


##
flink-end-to-end-tests/flink-sql-client-test/pom.xml:
##
@@ -69,6 +69,13 @@ under the License.
kafka
test

+
+   
+   commons-codec
+   commons-codec
+   test
+   
+

Review Comment:
   Hi @mbalassi, @slfan1989,
   
   ~I think we needn't this dependency, the `commons-codec`'s dependency is 
because `commons-compress` incorrectly depended on `commons-codec`'s Charsets 
in version 1.26.0.~
   
   ~This issue has been fixed in version 1.26.1, so perhaps we should bump 
`commons-compress` version to 1.26.1.~

   ~see: https://issues.apache.org/jira/browse/COMPRESS-659~



##
flink-end-to-end-tests/flink-sql-client-test/pom.xml:
##
@@ -69,6 +69,13 @@ under the License.
kafka
test

+
+   
+   commons-codec
+   commons-codec
+   test
+   
+

Review Comment:
   I have rechecked the dependency of `commons-codec` in `commons-compress` and 
it is no longer optional. Even if upgraded to 1.26.1, `commons-codec` will 
still be a transitive dependency. 
   Please ignore the previous noise, sorry for the disturbance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


fredia commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555208732


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {

Review Comment:
   I think this question is somewhat the same as 
https://github.com/apache/flink/pull/24614#discussion_r1554806613.
   In the future, an `internal`  `key-value` layer will be introduced,  we 
could make it internal kv interface or class parameter in the future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


masteryhx commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555175989


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,

Review Comment:
   The naming is a bit confusing.
   I may thinked it as accessing state synchronously if I missed above comment.
   How about `NOP` or other names at least says `without state access`.
   



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Executor for executing batch {@link StateRequest}s. */
+@Internal
+public interface StateExecutor {
+/**
+ * Execute a batch of state requests.
+ *
+ * @param processingRequests the given batch of processing requests
+ * @return A future can determine whether execution has completed.
+ */
+CompletableFuture executeBatchRequests(
+Iterable> processingRequests);

Review Comment:
   Different state types could be executed together, right ?
   How could we get specific state operations ? Current `RequestType` seems not 
enough.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Async Execution Controller (AEC) receives processing requests from 
operators, and put them
+ * into execution according to some strategies.
+ *
+ * It is responsible for:
+ * Preserving the sequence of elements bearing the 

[jira] [Updated] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2024-04-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-33934:
-
Affects Version/s: 1.19.0
   1.18.0
   1.17.0
   1.16.0
   1.15.0
   1.14.0
   1.13.0
   1.12.0

> Flink SQL Source use raw format maybe lead to data lost
> ---
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, 
> 1.19.0
>Reporter: Cai Liuyang
>Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue (our internal mq) and 
> write to hive (only select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should remove reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2024-04-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834765#comment-17834765
 ] 

Yun Tang commented on FLINK-33934:
--

I think it's easy to let data lost when using {{raw}} format with another 
customized connector. Moreover, the object reuse semantic is hidden in the 
{{raw}} format description. From my point of view, this is a potential bug, cc 
[~jark].

> Flink SQL Source use raw format maybe lead to data lost
> ---
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Reporter: Cai Liuyang
>Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue (our internal mq) and 
> write to hive (only select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should remove reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-07 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Description: 
The performance of serializerHeavyString regresses since April 3, and had not 
yet recovered on April 8th.

It seem Java 11 regresses, and Java 8 and Java 17 are fine.

http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200


 !screenshot-1.png! 

  was:
The performance of serializerHeavyString regresses since April 3, and had not 
yet recovered on April 8th.

http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200


 !image-2024-04-08-10-51-07-403.png! 




> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Priority: Blocker
> Attachments: image-2024-04-08-10-51-07-403.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> It seem Java 11 regresses, and Java 8 and Java 17 are fine.
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerHeavyString=on=on=off=3=200
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-07 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35040:

Attachment: screenshot-1.png

> The performance of serializerHeavyString regresses since April 3
> 
>
> Key: FLINK-35040
> URL: https://issues.apache.org/jira/browse/FLINK-35040
> Project: Flink
>  Issue Type: Bug
>  Components: Benchmarks
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Priority: Blocker
> Attachments: image-2024-04-08-10-51-07-403.png, screenshot-1.png
>
>
> The performance of serializerHeavyString regresses since April 3, and had not 
> yet recovered on April 8th.
> http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200
>  !image-2024-04-08-10-51-07-403.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-04-07 Thread via GitHub


morazow commented on PR #24426:
URL: https://github.com/apache/flink/pull/24426#issuecomment-2041795538

   Thanks @XComp, I am going to address your suggestions. Please have a look 
once you are back


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-04-07 Thread via GitHub


morazow commented on code in PR #24426:
URL: https://github.com/apache/flink/pull/24426#discussion_r1555171619


##
.github/workflows/template.python-wheels-ci.yml:
##
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Python Wheels CI
+
+on:
+  workflow_call:
+
+permissions: read-all
+
+jobs:
+  linux:
+runs-on: ubuntu-latest
+steps:
+  - name: Checkout the repository
+uses: actions/checkout@v4
+with:
+  fetch-depth: 0
+  persist-credentials: false
+  - name: Build wheels
+run: |
+  cd flink-python
+  bash dev/build-wheels.sh
+  - name: Tar artifact
+run: tar -cvf python-wheel.tar flink-python/dist

Review Comment:
   Yes, good idea!



##
.github/workflows/template.python-wheels-ci.yml:
##
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Python Wheels CI
+
+on:
+  workflow_call:
+
+permissions: read-all
+
+jobs:
+  linux:
+runs-on: ubuntu-latest
+steps:
+  - name: Checkout the repository
+uses: actions/checkout@v4
+with:
+  fetch-depth: 0
+  persist-credentials: false
+  - name: Build wheels
+run: |
+  cd flink-python
+  bash dev/build-wheels.sh
+  - name: Tar artifact
+run: tar -cvf python-wheel.tar flink-python/dist
+  - name: Upload artifact
+uses: actions/upload-artifact@v4
+with:
+  name: wheel_${{ runner.os }}_${{ github.sha }}_${{ github.run_number 
}}
+  path: python-wheel.tar
+  macos:
+runs-on: macos-latest
+steps:
+  - name: Checkout the repository
+uses: actions/checkout@v4
+with:
+  fetch-depth: 0
+  persist-credentials: false
+  - name: Install python

Review Comment:
   Yes good idea, I took the same steps from Azure pipelines. But I agree 
having same Python version on both Linux/MacOs jobs is good idea. I will change 
workflow.
   
   I didn't check which version is installed in the Linux VM



##
.github/workflows/nightly.yml:
##
@@ -94,3 +94,6 @@ jobs:
   s3_bucket: ${{ secrets.IT_CASE_S3_BUCKET }}
   s3_access_key: ${{ secrets.IT_CASE_S3_ACCESS_KEY }}
   s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }}
+  python-wheels:

Review Comment:
   Yes, that should be fine, I would also prefer the inlined version.
   
   I wanted to keep the structure similar to other jobs, but python wheels are 
not used more than once. I will change it to the inlined version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34955] Upgrade commons-compress to 1.26.0. [flink]

2024-04-07 Thread via GitHub


Jiabao-Sun commented on code in PR #24580:
URL: https://github.com/apache/flink/pull/24580#discussion_r1555174874


##
flink-end-to-end-tests/flink-sql-client-test/pom.xml:
##
@@ -69,6 +69,13 @@ under the License.
kafka
test

+
+   
+   commons-codec
+   commons-codec
+   test
+   
+

Review Comment:
   Hi @mbalassi, @slfan1989,
   
   I think we needn't this dependency, the `commons-codec`'s dependency is 
because `commons-compress` incorrectly depended on `commons-codec`'s Charsets 
in version 1.26.0. 
   
   This issue has been fixed in version 1.26.1, so perhaps we should bump 
`commons-compress` version to 1.26.1.

   see: https://issues.apache.org/jira/browse/COMPRESS-659



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35040) The performance of serializerHeavyString regresses since April 3

2024-04-07 Thread Rui Fan (Jira)
Rui Fan created FLINK-35040:
---

 Summary: The performance of serializerHeavyString regresses since 
April 3
 Key: FLINK-35040
 URL: https://issues.apache.org/jira/browse/FLINK-35040
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.20.0
Reporter: Rui Fan
 Attachments: image-2024-04-08-10-51-07-403.png

The performance of serializerHeavyString regresses since April 3, and had not 
yet recovered on April 8th.

http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200


 !image-2024-04-08-10-51-07-403.png! 





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-04-07 Thread via GitHub


morazow commented on code in PR #24426:
URL: https://github.com/apache/flink/pull/24426#discussion_r1555166750


##
.github/workflows/template.python-wheels-ci.yml:
##
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Python Wheels CI
+
+on:
+  workflow_call:
+
+permissions: read-all
+
+jobs:
+  linux:
+runs-on: ubuntu-latest
+steps:
+  - name: Checkout the repository
+uses: actions/checkout@v4
+with:
+  fetch-depth: 0
+  persist-credentials: false
+  - name: Build wheels
+run: |
+  cd flink-python
+  bash dev/build-wheels.sh
+  - name: Tar artifact
+run: tar -cvf python-wheel.tar flink-python/dist
+  - name: Upload artifact
+uses: actions/upload-artifact@v4
+with:
+  name: wheel_${{ runner.os }}_${{ github.sha }}_${{ github.run_number 
}}

Review Comment:
   Hey @XComp,
   
   I meant to make the python-wheels-ci GitHub workflow and Azure pipelines.
   
   Here the main concern is to keep the Python Wheels artifact names similar. 
Since the Azure pipelines used `Agent.JobName` it would be fine to use the 
stringrified workflow name, with `wheel` prefixed.
   
   Would that be good option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]

2024-04-07 Thread via GitHub


loserwang1024 commented on code in PR #3134:
URL: https://github.com/apache/flink-cdc/pull/3134#discussion_r1555165225


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java:
##
@@ -298,22 +298,35 @@ private void sendBinlogMeta(int subTask, 
BinlogSplitMetaRequestEvent requestEven
 finishedSnapshotSplitInfos, 
sourceConfig.getSplitMetaGroupSize());
 }
 final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
-
-if (binlogSplitMeta.size() > requestMetaGroupId) {
+final int totalFinishedSplitSizeOfReader = 
requestEvent.getTotalFinishedSplitSize();
+final int totalFinishedSplitSizeOfEnumerator = 
splitAssigner.getFinishedSplitInfos().size();
+if (totalFinishedSplitSizeOfReader > 
totalFinishedSplitSizeOfEnumerator) {

Review Comment:
   done it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]

2024-04-07 Thread via GitHub


loserwang1024 commented on code in PR #3134:
URL: https://github.com/apache/flink-cdc/pull/3134#discussion_r1555164983


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java:
##
@@ -43,10 +43,14 @@ public class BinlogSplitMetaEvent implements SourceEvent {
  */
 private final List metaGroup;
 
-public BinlogSplitMetaEvent(String splitId, int metaGroupId, List 
metaGroup) {
+private final int totalFinishedSplitSize;
+
+public BinlogSplitMetaEvent(
+String splitId, int metaGroupId, List metaGroup, int 
totalFinishedSplitSize) {

Review Comment:
   Done it.



##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java:
##
@@ -43,10 +43,14 @@ public class StreamSplitMetaEvent implements SourceEvent {
  */
 private final List metaGroup;
 
-public StreamSplitMetaEvent(String splitId, int metaGroupId, List 
metaGroup) {
+private final int totalFinishedSplitSize;
+
+public StreamSplitMetaEvent(
+String splitId, int metaGroupId, List metaGroup, int 
totalFinishedSplitSize) {

Review Comment:
   Done it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35039) Create Profiling JobManager/TaskManager Instance failed

2024-04-07 Thread JJJJude (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834758#comment-17834758
 ] 

ude commented on FLINK-35039:
-

[~Yu Chen] Please have a look, If my modification plan is fine, please give me 
Assign.

> Create Profiling JobManager/TaskManager Instance failed
> ---
>
> Key: FLINK-35039
> URL: https://issues.apache.org/jira/browse/FLINK-35039
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
> Environment: Hadoop 3.2.2
> Flink 1.19
>Reporter: ude
>Priority: Major
> Attachments: image-2024-04-08-10-21-31-066.png, 
> image-2024-04-08-10-21-48-417.png, image-2024-04-08-10-30-16-683.png
>
>
> I'm test the "async-profiler" feature in version 1.19, but when I submit a 
> task in yarn per-job mode, I get an error  when I click Create Profiling 
> Instance on the flink Web UI page.
> !image-2024-04-08-10-21-31-066.png!
> !image-2024-04-08-10-21-48-417.png!
> The error message obviously means that the yarn proxy server does not support 
> *POST* calls. I checked the code of _*WebAppProxyServlet.java*_ and found 
> that the *POST* method is indeed not supported, so I changed it to *PUT* 
> method and the call was successful.
> !image-2024-04-08-10-30-16-683.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35039) Create Profiling JobManager/TaskManager Instance failed

2024-04-07 Thread JJJJude (Jira)
ude created FLINK-35039:
---

 Summary: Create Profiling JobManager/TaskManager Instance failed
 Key: FLINK-35039
 URL: https://issues.apache.org/jira/browse/FLINK-35039
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.19.0
 Environment: Hadoop 3.2.2
Flink 1.19
Reporter: ude
 Attachments: image-2024-04-08-10-21-31-066.png, 
image-2024-04-08-10-21-48-417.png, image-2024-04-08-10-30-16-683.png

I'm test the "async-profiler" feature in version 1.19, but when I submit a task 
in yarn per-job mode, I get an error  when I click Create Profiling Instance on 
the flink Web UI page.
!image-2024-04-08-10-21-31-066.png!

!image-2024-04-08-10-21-48-417.png!

The error message obviously means that the yarn proxy server does not support 
*POST* calls. I checked the code of _*WebAppProxyServlet.java*_ and found that 
the *POST* method is indeed not supported, so I changed it to *PUT* method and 
the call was successful.

!image-2024-04-08-10-30-16-683.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34966) Support to read snapshot by table partitions in MySQL CDC Source

2024-04-07 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834756#comment-17834756
 ] 

Qingsheng Ren commented on FLINK-34966:
---

[~wanghe] Could you provide some context in the description?

> Support to read snapshot by table partitions in MySQL CDC Source
> 
>
> Key: FLINK-34966
> URL: https://issues.apache.org/jira/browse/FLINK-34966
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: He Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34661) TaskExecutor supports retain partitions after JM crashed.

2024-04-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-34661:
---

Assignee: Junrui Li

> TaskExecutor supports retain partitions after JM crashed.
> -
>
> Key: FLINK-34661
> URL: https://issues.apache.org/jira/browse/FLINK-34661
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33983) Introduce JobEvent and JobEventStore for Batch Job Recovery

2024-04-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-33983:
---

Assignee: Junrui Li

> Introduce JobEvent and JobEventStore for Batch Job Recovery
> ---
>
> Key: FLINK-33983
> URL: https://issues.apache.org/jira/browse/FLINK-33983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33986) Extend shuffleMaster to support batch snapshot.

2024-04-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-33986:
---

Assignee: Junrui Li

> Extend shuffleMaster to support batch snapshot.
> ---
>
> Key: FLINK-33986
> URL: https://issues.apache.org/jira/browse/FLINK-33986
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>
> Extend shuffleMaster to support batch snapshot as follows:
>  # Add method supportsBatchSnapshot to identify whether the shuffle master 
> supports taking snapshot in batch scenarios
>  # Add method snapshotState and restoreState to snapshot and restore the 
> shuffle master's state.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33984) Introduce SupportsBatchSnapshot for operator coordinator

2024-04-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-33984.
---
Fix Version/s: 1.20.0
   Resolution: Done

master:
38255652406becbfbcb7cbec557aa5ba9a1ebbb3
558ca75da2fcec875d1e04a8d75a24fd0ad42ccc

> Introduce SupportsBatchSnapshot for operator coordinator
> 
>
> Key: FLINK-33984
> URL: https://issues.apache.org/jira/browse/FLINK-33984
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35036) Flink CDC Job cancel with savepoint failed

2024-04-07 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834755#comment-17834755
 ] 

Qingsheng Ren commented on FLINK-35036:
---

[~fly365] Thanks for reporting the issue! I agree with [~bgeng777]'s idea and 
maybe you can have a try. 

And could you rewrite the description in English considering we have 
contributors around the world? Thanks

> Flink CDC Job cancel with savepoint failed
> --
>
> Key: FLINK-35036
> URL: https://issues.apache.org/jira/browse/FLINK-35036
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: Flink 1.15.2
> Flink CDC 2.4.2
> Oracle 19C
> Doris 2.0.3
>Reporter: Fly365
>Priority: Major
> Attachments: image-2024-04-07-17-35-23-136.png
>
>
> With the Flink CDC job, I want oracle data to doris, in the  snapshot,canel 
> the Flink CDC Job with savepoint,the job cancel failed.
> 使用Flink CDC,将Oracle 
> 19C的数据表同步到Doris中,在初始化快照阶段,同步了一部分数据但还没有到增量阶段,此时取消CDC任务并保存Flink 
> Savepoint,取消任务失败;而在任务进入增量阶段后,取消任务并保存savepoint是可以的,请问存量数据同步阶段,为何savepoint失败?
> !image-2024-04-07-17-35-23-136.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34945) Support recover shuffle descriptor and partition metrics from tiered storage

2024-04-07 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-34945:
---

Assignee: Junrui Li

> Support recover shuffle descriptor and partition metrics from tiered storage
> 
>
> Key: FLINK-34945
> URL: https://issues.apache.org/jira/browse/FLINK-34945
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-07 Thread Biao Geng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834752#comment-17834752
 ] 

Biao Geng commented on FLINK-35035:
---

I am not very familiar with adaptive scheduler, maybe others can share more 
insights. I just want to ask a question to make sure we are on the same page.
Do you mean that instead of triggering onNewResourcesAvailable repeatedly once 
a new slot is found(in your example, 5 new slots so 5 times of rescheduling), 
you are expecting the JM can discover the 5 new slots at the same time after a 
configurable period of time and only trigger 1 time of rescheduling?

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33984][runtime] Support batch snapshot for OperatorCoordinator. [flink]

2024-04-07 Thread via GitHub


zhuzhurk closed pull request #24415: [FLINK-33984][runtime] Support batch 
snapshot for OperatorCoordinator.
URL: https://github.com/apache/flink/pull/24415


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35009) Change on getTransitivePredecessors breaks connectors

2024-04-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834751#comment-17834751
 ] 

Weijie Guo commented on FLINK-35009:


This change can be done in a compatible way. But one could argue that 
{{Transformation}} is not actually a public API(marked as {{@Internal}} 
indeed). The connector's depend on it somehow shouldn't be guaranteed. 

BTW: The {{MockTransformation}} in kafka-connector seems unused and can be 
removed safely.

> Change on getTransitivePredecessors breaks connectors
> -
>
> Key: FLINK-35009
> URL: https://issues.apache.org/jira/browse/FLINK-35009
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Connectors / Kafka
>Affects Versions: 1.18.2, 1.20.0, 1.19.1
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-kafka: Compilation failure: 
> Compilation failure: 
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[214,24]
>  
> org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation
>  is not abstract and does not override abstract method 
> getTransitivePredecessorsInternal() in org.apache.flink.api.dag.Transformation
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[220,44]
>  getTransitivePredecessors() in 
> org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation
>  cannot override getTransitivePredecessors() in 
> org.apache.flink.api.dag.Transformation
> Error:overridden method is final
> {code}
> Example: 
> https://github.com/apache/flink-connector-kafka/actions/runs/8494349338/job/23269406762#step:15:167



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35009) Change on getTransitivePredecessors breaks connectors

2024-04-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834751#comment-17834751
 ] 

Weijie Guo edited comment on FLINK-35009 at 4/8/24 2:13 AM:


This change can be done in a compatible way. But one could argue that 
{{Transformation}} is not actually a public API(marked as {{@Internal}} 
indeed). The connector depend on it somehow shouldn't be guaranteed. 

BTW: The {{MockTransformation}} in kafka-connector seems unused and can be 
removed safely.


was (Author: weijie guo):
This change can be done in a compatible way. But one could argue that 
{{Transformation}} is not actually a public API(marked as {{@Internal}} 
indeed). The connector's depend on it somehow shouldn't be guaranteed. 

BTW: The {{MockTransformation}} in kafka-connector seems unused and can be 
removed safely.

> Change on getTransitivePredecessors breaks connectors
> -
>
> Key: FLINK-35009
> URL: https://issues.apache.org/jira/browse/FLINK-35009
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, Connectors / Kafka
>Affects Versions: 1.18.2, 1.20.0, 1.19.1
>Reporter: Martijn Visser
>Priority: Blocker
>
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-kafka: Compilation failure: 
> Compilation failure: 
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[214,24]
>  
> org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation
>  is not abstract and does not override abstract method 
> getTransitivePredecessorsInternal() in org.apache.flink.api.dag.Transformation
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java:[220,44]
>  getTransitivePredecessors() in 
> org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.InfiniteStringsGenerator.MockTransformation
>  cannot override getTransitivePredecessors() in 
> org.apache.flink.api.dag.Transformation
> Error:overridden method is final
> {code}
> Example: 
> https://github.com/apache/flink-connector-kafka/actions/runs/8494349338/job/23269406762#step:15:167



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-04-07 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834750#comment-17834750
 ] 

Zhongqiang Gong commented on FLINK-34955:
-

Hi [~slfan1989]  ,I apologize for the ambiguity. `{color:#c1c7d0}remove 
commons-codec dependence{color}`  means  `{color:#c1c7d0}we don't have to 
manually add a dependency to commons-codec.{color}` .

> Upgrade commons-compress to 1.26.0
> --
>
> Key: FLINK-34955
> URL: https://issues.apache.org/jira/browse/FLINK-34955
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shilun Fan
>Assignee: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
> refer to the maven link
> https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35036) Flink CDC Job cancel with savepoint failed

2024-04-07 Thread Biao Geng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834749#comment-17834749
 ] 

Biao Geng commented on FLINK-35036:
---

Hi [~fly365], according to the attached screenshot, the failure is caused by a 
timeout in flink client side. 
IIUC, in the full volume phase of a flink cdc job, it needs to process lots of 
data and typically due to the back pressure, the state may be much larger than 
the incremental phase(you can check the state size in flink's web ui).
As a result, it would take longer time for the flink to complete the savepoint. 
The client's [default timeout is 
60s|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#client-timeout],
 so maybe you can increase the value to see if the savepoint can succeed.


> Flink CDC Job cancel with savepoint failed
> --
>
> Key: FLINK-35036
> URL: https://issues.apache.org/jira/browse/FLINK-35036
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: Flink 1.15.2
> Flink CDC 2.4.2
> Oracle 19C
> Doris 2.0.3
>Reporter: Fly365
>Priority: Major
> Attachments: image-2024-04-07-17-35-23-136.png
>
>
> With the Flink CDC job, I want oracle data to doris, in the  snapshot,canel 
> the Flink CDC Job with savepoint,the job cancel failed.
> 使用Flink CDC,将Oracle 
> 19C的数据表同步到Doris中,在初始化快照阶段,同步了一部分数据但还没有到增量阶段,此时取消CDC任务并保存Flink 
> Savepoint,取消任务失败;而在任务进入增量阶段后,取消任务并保存savepoint是可以的,请问存量数据同步阶段,为何savepoint失败?
> !image-2024-04-07-17-35-23-136.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34573) the task is stuck on the high presure

2024-04-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-34573.
--
Resolution: Won't Fix

> the task is stuck on the high presure
> -
>
> Key: FLINK-34573
> URL: https://issues.apache.org/jira/browse/FLINK-34573
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.3
>Reporter: LSZ
>Priority: Blocker
> Attachments: rate.PNG, stuck.PNG, tm-thread-dump-chk-0123[1].json, 
> tm-thread-dump-no-lock-0123[1].json
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> we havae a flink job , jst one taskmanger;
> when use high presure as soure data,it will be stuck. sometimes it will be 
> run 1d ,somtimes it will be run 30min.
> !stuck.PNG!
> like this: (13:30 the taskmanager reboot,then run 30min, result is stuck )
> test 3 cases:
> 1: low presure (1200eps ),  it will run 30 min or 1d 。
> 2: close checkpoint , it will run 3d , high presure (1800eps) ,did not run 
> stuck。
> 3:double the orignal  managermemory, it still stuck, jst The appearance time 
> has been changed to 3 days from 30mins.
> !rate.PNG!
>  
> the threads dump info ,when high presure , cpu 90%~100%:
> [^tm-thread-dump-chk-0123[1].json]
> this is the normal info, when the low presure :
> [^tm-thread-dump-no-lock-0123[1].json]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Correct the option key for sortPartition's java doc [flink]

2024-04-07 Thread via GitHub


reswqa merged PR #24627:
URL: https://github.com/apache/flink/pull/24627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35006) Use try with-resource for StandaloneAutoscalerExecutor

2024-04-07 Thread Kirill Plugatarev (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirill Plugatarev closed FLINK-35006.
-
Resolution: Not A Problem

> Use try with-resource for StandaloneAutoscalerExecutor
> --
>
> Key: FLINK-35006
> URL: https://issues.apache.org/jira/browse/FLINK-35006
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Kirill Plugatarev
>Assignee: Kirill Plugatarev
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35006] Use try with-resource for StandaloneAutoscalerExecutor [flink-kubernetes-operator]

2024-04-07 Thread via GitHub


plugatarev closed pull request #811: [FLINK-35006] Use try with-resource for 
StandaloneAutoscalerExecutor
URL: https://github.com/apache/flink-kubernetes-operator/pull/811


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35006] Use try with-resource for StandaloneAutoscalerExecutor [flink-kubernetes-operator]

2024-04-07 Thread via GitHub


plugatarev commented on code in PR #811:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/811#discussion_r1555135563


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java:
##
@@ -60,9 +60,11 @@ public static > void main(String[
 
 var autoScaler = createJobAutoscaler(eventHandler, stateStore);
 
-var autoscalerExecutor =
-new StandaloneAutoscalerExecutor<>(conf, jobListFetcher, 
eventHandler, autoScaler);
-autoscalerExecutor.start();
+try (var autoscalerExecutor =
+new StandaloneAutoscalerExecutor<>(
+conf, jobListFetcher, eventHandler, autoScaler)) {
+autoscalerExecutor.start();
+}

Review Comment:
   Yes, you are right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35038) Bump test dependency org.yaml:snakeyaml to 2.2

2024-04-07 Thread Ufuk Celebi (Jira)
Ufuk Celebi created FLINK-35038:
---

 Summary: Bump test dependency org.yaml:snakeyaml to 2.2 
 Key: FLINK-35038
 URL: https://issues.apache.org/jira/browse/FLINK-35038
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Affects Versions: 3.1.0
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 3.1.0


Usage of SnakeYAML via {{flink-shaded}} was replaced by an explicit test scope 
dependency on {{org.yaml:snakeyaml:1.31}} with FLINK-34193.

This outdated version of SnakeYAML triggers security warnings. These should not 
be an actual issue given the test scope, but we should consider bumping the 
version for security hygiene purposes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-07 Thread via GitHub


liyubin117 commented on PR #24630:
URL: https://github.com/apache/flink/pull/24630#issuecomment-2041527525

   @LadyForest Hi, CI passed now, Looking forward your review, Thanks very much 
:)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34898) Cannot create ARRAY of named STRUCTs

2024-04-07 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834685#comment-17834685
 ] 

Feng Jin commented on FLINK-34898:
--

[~chloehe]   Thank you for the update, sorry for the late reply. Based on my 
testing, this seems to be an issue with the CAST function implementation. The 
specific reason may be related to this Jira ticket.     
https://issues.apache.org/jira/browse/FLINK-18673 

 

FLINK-18673 added support for ROW() as a parameter for UDFs. During the 
validation of SqlCall, the Validate function's Operand Node will be skipped.

+org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator#validateColumnListParams+

 
{code:java}
//代码占位符

@Override
public void validateColumnListParams(
SqlFunction function, List argTypes, List 
operands) {
// we don't support column lists and translate them into the unknown type 
in the type
// factory,
// this makes it possible to ignore them in the validator and fall back to 
regular row types
// see also SqlFunction#deriveType
}
{code}
 

But SqlFunction::deriveType will indirectly call 
org.apache.calcite.sql.validate.SqlValidator#deriveType to infer the type of 
Row().

 

 

!截屏2024-04-07 22.05.40.png!

 

In the SqlCastFunction, it will directly obtaining ValidatedNode type. 

 
{code:java}
//代码占位符

@Override
public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
final SqlNode left = callBinding.operand(0);
final SqlNode right = callBinding.operand(1);
if (SqlUtil.isNullLiteral(left, false) || left instanceof 
SqlDynamicParam) {
return true;
}
RelDataType validatedNodeType = 
callBinding.getValidator().getValidatedNodeType(left);
//RelDataType validatedNodeType = SqlTypeUtil.deriveType(callBinding, 
left);
RelDataType returnType = SqlTypeUtil.deriveType(callBinding, right);
if (!canCastFrom(returnType, validatedNodeType)) {
if (throwOnFailure) {
throw callBinding.newError(
RESOURCE.cannotCastValue(
validatedNodeType.toString(), 
returnType.toString()));
}
return false;
} {code}
 

 

And the corresponding error message is as follows:

 
{code:java}
//代码占位符

.SqlBasicCall: ROW(1, 2)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
at 
org.apache.flink.table.examples.java.basics.WordCountSQLExample.main(WordCountSQLExample.java:49)
Caused by: java.lang.UnsupportedOperationException: class 
org.apache.calcite.sql.SqlBasicCall: ROW(1, 2)
at org.apache.calcite.util.Util.needToImplement(Util.java:1101)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.getValidatedNodeType(SqlValidatorImpl.java:1777)
at 
org.apache.calcite.sql.fun.SqlCastFunction.checkOperandTypes(SqlCastFunction.java:138)
 {code}
 

 

If we use SqlTypeUtil.deriveType, we can get the correct type. Therefore, I 
think we can modify this section to fix the issue.

 
{code:java}
//代码占位符

RelDataType validatedNodeType = SqlTypeUtil.deriveType(callBinding, left); 
{code}
 

 

 

> Cannot create ARRAY of named STRUCTs
> 
>
> Key: FLINK-34898
> URL: https://issues.apache.org/jira/browse/FLINK-34898
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.19.0
>Reporter: Chloe He
>Priority: Major
> Attachments: image-2024-03-21-12-00-00-183.png, 截屏2024-04-07 
> 22.05.40.png
>
>
> I want to construct data that consists of arrays of named STRUCT. For 
> example, one field may look like `[\{"a": 1}]`. I am able to construct this 
> named STRUCT as
> {code:java}
> SELECT CAST(ROW(1) as ROW) AS row1;  {code}
> but when I try to wrap this in an ARRAY, it fails:
> {code:java}
> SELECT ARRAY[CAST(ROW(1) as ROW)] AS row1;  
> // error
> Caused by: java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.SqlBasicCall: ROW(1)
> {code}
> These are the workarounds that I found:
> {code:java}
> SELECT ROW(ROW(CAST(ROW(1) as ROW))) AS row1; 
> // or
> SELECT cast(ARRAY[ROW(1)] as ARRAY>); {code}
> but I think this is a bug that we need to follow up and fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-04-07 Thread Shilun Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834682#comment-17834682
 ] 

Shilun Fan edited comment on FLINK-34955 at 4/7/24 2:17 PM:


[~gongzhongqiang] Of course, if upgrading is possible, it would be a positive 
step forward. I think we should give it a try. I see that you have created the 
relevant JIRA ticket, so you can go ahead and attempt it. Hopefully, it will be 
successful. (However, my preference would be to stick with version 1.26 for 
now, and consider upgrading to 1.26.1 in the future. If other components of 
Flink need upgrading, I think it would be best to upgrade them to version 1.26 
as well. Removing dependencies, in my opinion, is not a good option.)


was (Author: slfan1989):
[~gongzhongqiang] Of course, if upgrading is possible, it would be a positive 
step forward. I think we should give it a try. I see that you have created the 
relevant JIRA ticket, so you can go ahead and attempt it. Hopefully, it will be 
successful.

> Upgrade commons-compress to 1.26.0
> --
>
> Key: FLINK-34955
> URL: https://issues.apache.org/jira/browse/FLINK-34955
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shilun Fan
>Assignee: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
> refer to the maven link
> https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34898) Cannot create ARRAY of named STRUCTs

2024-04-07 Thread Feng Jin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jin updated FLINK-34898:
-
Attachment: 截屏2024-04-07 22.05.40.png

> Cannot create ARRAY of named STRUCTs
> 
>
> Key: FLINK-34898
> URL: https://issues.apache.org/jira/browse/FLINK-34898
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.19.0
>Reporter: Chloe He
>Priority: Major
> Attachments: image-2024-03-21-12-00-00-183.png, 截屏2024-04-07 
> 22.05.40.png
>
>
> I want to construct data that consists of arrays of named STRUCT. For 
> example, one field may look like `[\{"a": 1}]`. I am able to construct this 
> named STRUCT as
> {code:java}
> SELECT CAST(ROW(1) as ROW) AS row1;  {code}
> but when I try to wrap this in an ARRAY, it fails:
> {code:java}
> SELECT ARRAY[CAST(ROW(1) as ROW)] AS row1;  
> // error
> Caused by: java.lang.UnsupportedOperationException: class 
> org.apache.calcite.sql.SqlBasicCall: ROW(1)
> {code}
> These are the workarounds that I found:
> {code:java}
> SELECT ROW(ROW(CAST(ROW(1) as ROW))) AS row1; 
> // or
> SELECT cast(ARRAY[ROW(1)] as ARRAY>); {code}
> but I think this is a bug that we need to follow up and fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-04-07 Thread Shilun Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834682#comment-17834682
 ] 

Shilun Fan commented on FLINK-34955:


[~gongzhongqiang] Of course, if upgrading is possible, it would be a positive 
step forward. I think we should give it a try. I see that you have created the 
relevant JIRA ticket, so you can go ahead and attempt it. Hopefully, it will be 
successful.

> Upgrade commons-compress to 1.26.0
> --
>
> Key: FLINK-34955
> URL: https://issues.apache.org/jira/browse/FLINK-34955
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shilun Fan
>Assignee: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
> refer to the maven link
> https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-04-07 Thread Shilun Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834681#comment-17834681
 ] 

Shilun Fan commented on FLINK-34955:


[~gongzhongqiang] From my personal perspective, I believe upgrading to version 
1.26.0 should be sufficient as this version has already fixed the CVE issue. As 
for upgrading to 1.26.1, I think we can consider it after some time. Removing 
commons-codec might prove to be challenging because Flink has dependencies on 
Hadoop and HBase (both of which directly depend on commons-codec). If we remove 
commons-codec, it may result in the Hadoop and HBase modules being unable to 
compile successfully.

> Upgrade commons-compress to 1.26.0
> --
>
> Key: FLINK-34955
> URL: https://issues.apache.org/jira/browse/FLINK-34955
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shilun Fan
>Assignee: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
> refer to the maven link
> https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35010][connectors/mongodb] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector [flink-connector-mongodb]

2024-04-07 Thread via GitHub


GOODBOY008 commented on PR #32:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/32#issuecomment-2041478551

   @Jiabao-Sun PR updated and version to `1.26.1`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34955) Upgrade commons-compress to 1.26.0

2024-04-07 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834678#comment-17834678
 ] 

Zhongqiang Gong commented on FLINK-34955:
-

[~slfan1989] [~mbalassi] According to 
https://issues.apache.org/jira/browse/COMPRESS-659 , [~jiabaosun]  and I think 
it's better bump version to 1.26.1 and remove `commons-codec` dependence.

> Upgrade commons-compress to 1.26.0
> --
>
> Key: FLINK-34955
> URL: https://issues.apache.org/jira/browse/FLINK-34955
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shilun Fan
>Assignee: Shilun Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> commons-compress 1.24.0 has CVE issues, try to upgrade to 1.26.0, we can 
> refer to the maven link
> https://mvnrepository.com/artifact/org.apache.commons/commons-compress



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1554965456


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,
+/** Get from one {@link State}. */
+GET,
+/** Put to one {@link State}. */
+PUT,
+/** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+MERGE,
+/** Delete from one {@link State}. */
+DELETE
+}
+
+/** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+@Nullable private final State state;
+
+/** The type of this request. */
+private final RequestType type;
+
+/** The payload(input) of this request. */
+@Nullable private final IN payload;
+
+/** The future to collect the result of the request. */
+private InternalStateFuture stateFuture;
+
+/** The record context of this request. */
+private RecordContext context;
+
+StateRequest(@Nullable State state, RequestType type, @Nullable IN 
payload) {
+this.state = state;
+this.type = type;
+this.payload = payload;
+}
+
+RequestType getRequestType() {
+return type;
+}
+
+@Nullable
+IN getPayload() {
+return payload;
+}
+
+@Nullable
+State getState() {
+return state;
+}
+
+InternalStateFuture getFuture() {
+return stateFuture;
+}
+
+void setFuture(InternalStateFuture future) {
+stateFuture = future;
+}
+
+RecordContext getRecordContext() {
+return context;
+}
+
+void setRecordContext(RecordContext context) {
+this.context = context;
+}

Review Comment:
   @ljz2051 The reason behind this is that the `RecordContext` is managed by 
the `AEC` and related components, while the `StateRequest` is created in direct 
implementation of State API (`internal key-value layer`), which lacks the 
knowledge of `RecordContext`. I tried to avoid introducing `getRecordContext` 
in `AEC` and call this in every implementation of the state interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-07 Thread via GitHub


ljz2051 commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1554960979


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,
+/** Get from one {@link State}. */
+GET,
+/** Put to one {@link State}. */
+PUT,
+/** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+MERGE,
+/** Delete from one {@link State}. */
+DELETE
+}
+
+/** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+@Nullable private final State state;
+
+/** The type of this request. */
+private final RequestType type;
+
+/** The payload(input) of this request. */
+@Nullable private final IN payload;
+
+/** The future to collect the result of the request. */
+private InternalStateFuture stateFuture;
+
+/** The record context of this request. */
+private RecordContext context;
+
+StateRequest(@Nullable State state, RequestType type, @Nullable IN 
payload) {
+this.state = state;
+this.type = type;
+this.payload = payload;
+}
+
+RequestType getRequestType() {
+return type;
+}
+
+@Nullable
+IN getPayload() {
+return payload;
+}
+
+@Nullable
+State getState() {
+return state;
+}
+
+InternalStateFuture getFuture() {
+return stateFuture;
+}
+
+void setFuture(InternalStateFuture future) {
+stateFuture = future;
+}
+
+RecordContext getRecordContext() {
+return context;
+}
+
+void setRecordContext(RecordContext context) {
+this.context = context;
+}

Review Comment:
   The key in RecordContext is essential for stateRequest. Without setting the 
RecordContext, the stateRequest is essentially "incomplete". Why don't we 
consider making RecordContext a final constructor variable for stateRequest?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER

2024-04-07 Thread yisha zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yisha zhou updated FLINK-35037:
---
Description: 
In current Implementation, relNodes with Window type will only deliver 
upsert/unique keys of their inputs.

However windows with ROW_NUMBER can also produce upsert/unique keys.

For example:
{code:java}
select id, name, score, age, class,
    row_number() over(partition by class order by name) as rn,
    rank() over (partition by class order by score) as rk,
    dense_rank() over (partition by class order by score) as drk,
    avg(score) over (partition by class order by score) as avg_score,
    max(score) over (partition by age) as max_score,
    count(id) over (partition by age) as cnt
from student {code}
(class, rn) is a valid uniqueKeys/upsertKeys candidate. 

  was:
In current Implementation, relNodes with Window type will only deliver 
upsert/unique keys of their inputs if these keys contains the partition keys.

However windows with ROW_NUMBER can also produce upsert/unique keys.

For example:
{code:java}
select id, name, score, age, class,
    row_number() over(partition by class order by name) as rn,
    rank() over (partition by class order by score) as rk,
    dense_rank() over (partition by class order by score) as drk,
    avg(score) over (partition by class order by score) as avg_score,
    max(score) over (partition by age) as max_score,
    count(id) over (partition by age) as cnt
from student {code}
(class, rn) is a valid uniqueKeys candidate. 


> Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER
> ---
>
> Key: FLINK-35037
> URL: https://issues.apache.org/jira/browse/FLINK-35037
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: yisha zhou
>Priority: Major
>
> In current Implementation, relNodes with Window type will only deliver 
> upsert/unique keys of their inputs.
> However windows with ROW_NUMBER can also produce upsert/unique keys.
> For example:
> {code:java}
> select id, name, score, age, class,
>     row_number() over(partition by class order by name) as rn,
>     rank() over (partition by class order by score) as rk,
>     dense_rank() over (partition by class order by score) as drk,
>     avg(score) over (partition by class order by score) as avg_score,
>     max(score) over (partition by age) as max_score,
>     count(id) over (partition by age) as cnt
> from student {code}
> (class, rn) is a valid uniqueKeys/upsertKeys candidate. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER

2024-04-07 Thread yisha zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yisha zhou updated FLINK-35037:
---
Description: 
In current Implementation, relNodes with Window type will only deliver 
upsert/unique keys of their inputs.

However windows with ROW_NUMBER can also produce upsert/unique keys.

For example:
{code:java}
select id, name, score, age, class,
    row_number() over(partition by class order by name) as rn,
    rank() over (partition by class order by score) as rk,
    dense_rank() over (partition by class order by score) as drk,
    avg(score) over (partition by class order by score) as avg_score,
    max(score) over (partition by age) as max_score,
    count(id) over (partition by age) as cnt
from student {code}
(class, rn) is a valid upsert/unique keys candidate. 

  was:
In current Implementation, relNodes with Window type will only deliver 
upsert/unique keys of their inputs.

However windows with ROW_NUMBER can also produce upsert/unique keys.

For example:
{code:java}
select id, name, score, age, class,
    row_number() over(partition by class order by name) as rn,
    rank() over (partition by class order by score) as rk,
    dense_rank() over (partition by class order by score) as drk,
    avg(score) over (partition by class order by score) as avg_score,
    max(score) over (partition by age) as max_score,
    count(id) over (partition by age) as cnt
from student {code}
(class, rn) is a valid uniqueKeys/upsertKeys candidate. 


> Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER
> ---
>
> Key: FLINK-35037
> URL: https://issues.apache.org/jira/browse/FLINK-35037
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: yisha zhou
>Priority: Major
>
> In current Implementation, relNodes with Window type will only deliver 
> upsert/unique keys of their inputs.
> However windows with ROW_NUMBER can also produce upsert/unique keys.
> For example:
> {code:java}
> select id, name, score, age, class,
>     row_number() over(partition by class order by name) as rn,
>     rank() over (partition by class order by score) as rk,
>     dense_rank() over (partition by class order by score) as drk,
>     avg(score) over (partition by class order by score) as avg_score,
>     max(score) over (partition by age) as max_score,
>     count(id) over (partition by age) as cnt
> from student {code}
> (class, rn) is a valid upsert/unique keys candidate. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35037) Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER

2024-04-07 Thread yisha zhou (Jira)
yisha zhou created FLINK-35037:
--

 Summary: Optimize uniqueKeys and upsertKeys inference of windows 
with ROW_NUMBER
 Key: FLINK-35037
 URL: https://issues.apache.org/jira/browse/FLINK-35037
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: yisha zhou


In current Implementation, relNodes with Window type will only deliver 
upsert/unique keys of their inputs if these keys contains the partition keys.

However windows with ROW_NUMBER can also produce upsert/unique keys.

For example:
{code:java}
select id, name, score, age, class,
    row_number() over(partition by class order by name) as rn,
    rank() over (partition by class order by score) as rk,
    dense_rank() over (partition by class order by score) as drk,
    avg(score) over (partition by class order by score) as avg_score,
    max(score) over (partition by age) as max_score,
    count(id) over (partition by age) as cnt
from student {code}
(class, rn) is a valid uniqueKeys candidate. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32440][checkpoint] Introduce file merging configurations [flink]

2024-04-07 Thread via GitHub


fredia commented on PR #22973:
URL: https://github.com/apache/flink/pull/22973#issuecomment-2041450107

   @Zakelly I have rebased this PR, could you please take a look if you're 
free? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-07 Thread via GitHub


PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1554892220


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##
@@ -20,18 +20,33 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
+private static final Logger LOG = 
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
 private static final String FLINK_CONF_DIR = "conf";
-private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String FLINK_CONF_FILENAME = "config.yaml";
 
 public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
 Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
-return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
+try {
+return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   What about renaming the method to `ConfigurationUtils#loadConfigFile`? This 
method doesn't process map-formatted config (`flink-conf.yaml`) only.



##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
 }
 ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
 try {
-Map configMap =
+Map configMap =
 mapper.readValue(
-configPath.toFile(), new TypeReference>() {});
-return Configuration.fromMap(configMap);
+configPath.toFile(), new TypeReference>() {});
+return Configuration.fromMap(flattenConfigMap(configMap));
 } catch (Exception e) {
 throw new IllegalStateException(
 String.format(
 "Failed to load config file \"%s\" to key-value 
pairs", configPath),
 e);
 }
 }
+
+private static Map flattenConfigMap(Map 
configMap) {
+Map result = new HashMap<>();
+flattenConfigMapHelper(configMap, "", result);
+return result;
+}
+
+private static void flattenConfigMapHelper(
+Map configMap, String currentPath, Map result) {
+for (Map.Entry entry : configMap.entrySet()) {
+String updatedPath =
+currentPath.isEmpty() ? entry.getKey() : currentPath + "." 
+ entry.getKey();
+if (entry.getValue() instanceof Map) {
+flattenConfigMapHelper((Map) entry.getValue(), 
updatedPath, result);
+} else {

Review Comment:
   There should be a case for handling `List` type, according to 
https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307



##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
 }
 ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
 try {
-Map configMap =
+Map configMap =
 mapper.readValue(
-configPath.toFile(), new TypeReference>() {});
-return Configuration.fromMap(configMap);
+configPath.toFile(), new TypeReference>() {});
+return Configuration.fromMap(flattenConfigMap(configMap));
 } catch (Exception e) {
 throw new IllegalStateException(
 String.format(
 "Failed to load config file \"%s\" to key-value 
pairs", configPath),
 e);
 }
 }
+
+private static Map flattenConfigMap(Map 
configMap) {
+Map result = new HashMap<>();
+flattenConfigMapHelper(configMap, "", result);
+return result;
+}

Review Comment:
   This looks like just a very simple wrapper around `flattenConfigMapHelper`. 
We can just merge the logic into `flattenConfigMap` instead of having another 
helper. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-07 Thread via GitHub


liyubin117 commented on PR #24630:
URL: https://github.com/apache/flink/pull/24630#issuecomment-2041411609

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions [flink]

2024-04-07 Thread via GitHub


Zakelly commented on PR #24629:
URL: https://github.com/apache/flink/pull/24629#issuecomment-2041409660

   @XComp Would you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-04-07 Thread via GitHub


gong commented on PR #3207:
URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2041406982

   > I wonder if changing `DorisEventSerializer.DATE_TIME_FORMATTER` to 
`-MM-dd HH:mm:ss.SS` fixes this problem, too?
   
   @yuxiqian I think that it can fix this problem, too. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34915) Complete `DESCRIBE CATALOG` syntax

2024-04-07 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-34915:
-
 Attachment: image-2024-04-07-17-54-51-203.png
Description: 
Describe the metadata of an existing catalog. The metadata information includes 
the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is 
specified, catalog properties are also returned.

NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it 
is not actually available. we can complete the syntax in this FLIP. 

!image-2024-04-07-17-54-51-203.png|width=545,height=332!

  was:
Describe the metadata of an existing catalog. The metadata information includes 
the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is 
specified, catalog properties are also returned.

NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it 
is not actually available. we can complete the syntax in this FLIP. 

!image-2024-03-22-18-29-00-454.png|width=561,height=374!


> Complete `DESCRIBE CATALOG` syntax
> --
>
> Key: FLINK-34915
> URL: https://issues.apache.org/jira/browse/FLINK-34915
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-03-22-18-29-00-454.png, 
> image-2024-04-07-17-54-51-203.png
>
>
> Describe the metadata of an existing catalog. The metadata information 
> includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} 
> option is specified, catalog properties are also returned.
> NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and 
> it is not actually available. we can complete the syntax in this FLIP. 
> !image-2024-04-07-17-54-51-203.png|width=545,height=332!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-07 Thread via GitHub


flinkbot commented on PR #24630:
URL: https://github.com/apache/flink/pull/24630#issuecomment-2041394595

   
   ## CI report:
   
   * a1146244ce8a490c2ca1557f4c2410b59effb333 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34915) Complete `DESCRIBE CATALOG` syntax

2024-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34915:
---
Labels: pull-request-available  (was: )

> Complete `DESCRIBE CATALOG` syntax
> --
>
> Key: FLINK-34915
> URL: https://issues.apache.org/jira/browse/FLINK-34915
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-03-22-18-29-00-454.png
>
>
> Describe the metadata of an existing catalog. The metadata information 
> includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} 
> option is specified, catalog properties are also returned.
> NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and 
> it is not actually available. we can complete the syntax in this FLIP. 
> !image-2024-03-22-18-29-00-454.png|width=561,height=374!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-07 Thread via GitHub


liyubin117 opened a new pull request, #24630:
URL: https://github.com/apache/flink/pull/24630

   ## What is the purpose of the change
   
   Describe the metadata of an existing catalog. The metadata information 
includes the catalog’s name, type, and comment. If the optional EXTENDED option 
is specified, catalog properties are also returned.
   
   ## Brief change log
   
   * { DESCRIBE | DESC } CATALOG [EXTENDED] catalog_name
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
   flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34952][cdc-composer][sink] Flink CDC pipeline supports SinkFunction [flink-cdc]

2024-04-07 Thread via GitHub


loserwang1024 commented on code in PR #3204:
URL: https://github.com/apache/flink-cdc/pull/3204#discussion_r1554863395


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java:
##
@@ -35,4 +35,10 @@ public class ValuesDataSinkOptions {
 .booleanType()
 .defaultValue(true)
 .withDescription("True if the Event should be print to 
console.");
+
+public static final ConfigOption SINK_TYPE =
+ConfigOptions.key("sinkType")

Review Comment:
   It sounds better. Just do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Fix YARN ContainerId.getId Deprecated Used. [flink]

2024-04-07 Thread via GitHub


slfan1989 commented on PR #24601:
URL: https://github.com/apache/flink/pull/24601#issuecomment-2041389025

   @1996fanrui Thank you very much for reviewing the code!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35036) Flink CDC Job cancel with savepoint failed

2024-04-07 Thread Fly365 (Jira)
Fly365 created FLINK-35036:
--

 Summary: Flink CDC Job cancel with savepoint failed
 Key: FLINK-35036
 URL: https://issues.apache.org/jira/browse/FLINK-35036
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
 Environment: Flink 1.15.2

Flink CDC 2.4.2

Oracle 19C

Doris 2.0.3
Reporter: Fly365
 Attachments: image-2024-04-07-17-35-23-136.png

With the Flink CDC job, I want oracle data to doris, in the  snapshot,canel the 
Flink CDC Job with savepoint,the job cancel failed.

使用Flink CDC,将Oracle 
19C的数据表同步到Doris中,在初始化快照阶段,同步了一部分数据但还没有到增量阶段,此时取消CDC任务并保存Flink 
Savepoint,取消任务失败;而在任务进入增量阶段后,取消任务并保存savepoint是可以的,请问存量数据同步阶段,为何savepoint失败?

!image-2024-04-07-17-35-23-136.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [cdc-connector][cdc-base] Shade guava31 to avoid dependency conflict with flink below 1.18 [flink-cdc]

2024-04-07 Thread via GitHub


PatrickRen commented on code in PR #3083:
URL: https://github.com/apache/flink-cdc/pull/3083#discussion_r1554857536


##
pom.xml:
##
@@ -462,8 +462,15 @@ under the License.
 submodules, ${flink.version} will be 
resolved as the actual Flink version.
 -->
 
org.apache.flink:flink-shaded-force-shading
+
org.apache.flink:flink-shaded-guava
 
 
+
+
+flink.shaded.guava

Review Comment:
   Thanks for the update! You are correct. Depending on Guava directly is 
prohibited in checkstyle to avoid using non-relocated guava by mistake. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35023) YARNApplicationITCase failed on Azure

2024-04-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834639#comment-17834639
 ] 

Weijie Guo edited comment on FLINK-35023 at 4/7/24 9:24 AM:


jdk17:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28322

jdk21:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=59a2b95a-736b-5c46-b3e0-cee6e587fd86=c301da75-e699-5c06-735f-778207c16f50=28612


was (Author: weijie guo):
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28322

> YARNApplicationITCase failed on Azure
> -
>
> Key: FLINK-35023
> URL: https://issues.apache.org/jira/browse/FLINK-35023
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> 1. 
> YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion
> {code:java}
> Apr 06 02:19:44 02:19:44.063 [ERROR] 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion
>  -- Time elapsed: 9.727 s <<< FAILURE!
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion$1(YARNApplicationITCase.java:72)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion(YARNApplicationITCase.java:70)
> Apr 06 02:19:44   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> 2. YARNApplicationITCase.testApplicationClusterWithRemoteUserJar
> {code:java}
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithRemoteUserJar$2(YARNApplicationITCase.java:86)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithRemoteUserJar(YARNApplicationITCase.java:84)
> Apr 06 02:19:44   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> 3. 
> YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion
> {code:java}
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 

[jira] [Commented] (FLINK-35023) YARNApplicationITCase failed on Azure

2024-04-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834639#comment-17834639
 ] 

Weijie Guo commented on FLINK-35023:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58759=logs=d871f0ce-7328-5d00-023b-e7391f5801c8=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6=28322

> YARNApplicationITCase failed on Azure
> -
>
> Key: FLINK-35023
> URL: https://issues.apache.org/jira/browse/FLINK-35023
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> 1. 
> YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion
> {code:java}
> Apr 06 02:19:44 02:19:44.063 [ERROR] 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion
>  -- Time elapsed: 9.727 s <<< FAILURE!
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion$1(YARNApplicationITCase.java:72)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion(YARNApplicationITCase.java:70)
> Apr 06 02:19:44   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> 2. YARNApplicationITCase.testApplicationClusterWithRemoteUserJar
> {code:java}
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithRemoteUserJar$2(YARNApplicationITCase.java:86)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithRemoteUserJar(YARNApplicationITCase.java:84)
> Apr 06 02:19:44   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:568)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> Apr 06 02:19:44   at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> {code}
> 3. 
> YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion
> {code:java}
> Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
> while expecting FINISHED
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion$0(YARNApplicationITCase.java:62)
> Apr 06 02:19:44   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Apr 06 02:19:44   at 
> 

Re: [PR] [FLINK-34952][cdc-composer][sink] Flink CDC pipeline supports SinkFunction [flink-cdc]

2024-04-07 Thread via GitHub


PatrickRen commented on code in PR #3204:
URL: https://github.com/apache/flink-cdc/pull/3204#discussion_r1554855153


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java:
##
@@ -35,4 +35,10 @@ public class ValuesDataSinkOptions {
 .booleanType()
 .defaultValue(true)
 .withDescription("True if the Event should be print to 
console.");
+
+public static final ConfigOption SINK_TYPE =
+ConfigOptions.key("sinkType")

Review Comment:
   What about `sink.api`? We use the word "type" in YAML for specifying the 
category of connector, such as "mysql", "doris", "values", so I'm concerned 
about the potential conflict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34959) Update old flink-cdc-connectors artifactId

2024-04-07 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren resolved FLINK-34959.
---
Resolution: Fixed

> Update old flink-cdc-connectors artifactId
> --
>
> Key: FLINK-34959
> URL: https://issues.apache.org/jira/browse/FLINK-34959
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: xleoken
>Assignee: xleoken
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34959) Update old flink-cdc-connectors artifactId

2024-04-07 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834638#comment-17834638
 ] 

Qingsheng Ren commented on FLINK-34959:
---

cdc-master: 6510e670faa154d619937d97dc3f146eb162fac2

> Update old flink-cdc-connectors artifactId
> --
>
> Key: FLINK-34959
> URL: https://issues.apache.org/jira/browse/FLINK-34959
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: xleoken
>Assignee: xleoken
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34959] Update old flink-cdc-connectors artifactId [flink-cdc]

2024-04-07 Thread via GitHub


PatrickRen merged PR #3200:
URL: https://github.com/apache/flink-cdc/pull/3200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34959) Update old flink-cdc-connectors artifactId

2024-04-07 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-34959:
-

Assignee: xleoken

> Update old flink-cdc-connectors artifactId
> --
>
> Key: FLINK-34959
> URL: https://issues.apache.org/jira/browse/FLINK-34959
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: xleoken
>Assignee: xleoken
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-07 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834636#comment-17834636
 ] 

xiaogang zhou commented on FLINK-32070:
---

[~Zakelly] yes, sounds good, Let me take a look 

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34712) Update reference data for Migration Tests

2024-04-07 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834634#comment-17834634
 ] 

lincoln lee edited comment on FLINK-34712 at 4/7/24 8:53 AM:
-

fixed in 1.19: 4397c9f1d0f300bd8c0f9de4e59e1a5d7883ec2c
fixed in master: 5bbcf8de79ce1979412879b919299ffa5a9b62fe


was (Author: lincoln.86xy):
fixed in 1.19: 4397c9f1d0f300bd8c0f9de4e59e1a5d7883ec2c

> Update reference data for Migration Tests
> -
>
> Key: FLINK-34712
> URL: https://issues.apache.org/jira/browse/FLINK-34712
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lincoln Lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> Update migration tests in master to cover migration from new version. Since 
> 1.18, this step could be done automatically with the following steps. For 
> more information please refer to [this 
> page.|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/README.md]
>  # {*}On the published release tag (e.g., release-1.16.0){*}, run 
> {panel}
> {panel}
> |{{$ mvn clean }}{{package}} {{{}-Pgenerate-migration-test-data 
> -Dgenerate.version={}}}{{{}1.16{}}} {{-nsu -Dfast -DskipTests}}|
> The version (1.16 in the command above) should be replaced with the target 
> one.
>  # Modify the content of the file 
> [apache/flink:flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version]
>  to the latest version (it would be "v1_16" if sticking to the example where 
> 1.16.0 was released). 
>  # Commit the modification in step a and b with "{_}[release] Generate 
> reference data for state migration tests based on release-1.xx.0{_}" to the 
> corresponding release branch (e.g. {{release-1.16}} in our example), replace 
> "xx" with the actual version (in this example "16"). You should use the Jira 
> issue ID in case of [release]  as the commit message's prefix if you have a 
> dedicated Jira issue for this task.
>  # Cherry-pick the commit to the master branch. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34712) Update reference data for Migration Tests

2024-04-07 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-34712.
---
Resolution: Fixed

> Update reference data for Migration Tests
> -
>
> Key: FLINK-34712
> URL: https://issues.apache.org/jira/browse/FLINK-34712
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lincoln Lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> Update migration tests in master to cover migration from new version. Since 
> 1.18, this step could be done automatically with the following steps. For 
> more information please refer to [this 
> page.|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/README.md]
>  # {*}On the published release tag (e.g., release-1.16.0){*}, run 
> {panel}
> {panel}
> |{{$ mvn clean }}{{package}} {{{}-Pgenerate-migration-test-data 
> -Dgenerate.version={}}}{{{}1.16{}}} {{-nsu -Dfast -DskipTests}}|
> The version (1.16 in the command above) should be replaced with the target 
> one.
>  # Modify the content of the file 
> [apache/flink:flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version]
>  to the latest version (it would be "v1_16" if sticking to the example where 
> 1.16.0 was released). 
>  # Commit the modification in step a and b with "{_}[release] Generate 
> reference data for state migration tests based on release-1.xx.0{_}" to the 
> corresponding release branch (e.g. {{release-1.16}} in our example), replace 
> "xx" with the actual version (in this example "16"). You should use the Jira 
> issue ID in case of [release]  as the commit message's prefix if you have a 
> dedicated Jira issue for this task.
>  # Cherry-pick the commit to the master branch. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34712][release] Generate reference data for state migration tests based on release-1.19.0 [flink]

2024-04-07 Thread via GitHub


lincoln-lil merged PR #24594:
URL: https://github.com/apache/flink/pull/24594


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-04-07 Thread via GitHub


yuxiqian commented on PR #3207:
URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2041372727

   I wonder if changing `DorisEventSerializer.DATE_TIME_FORMATTER` to 
`-MM-dd HH:mm:ss.SS` would fix this problem, too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-04-07 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2041370638

   hi @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34712) Update reference data for Migration Tests

2024-04-07 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834634#comment-17834634
 ] 

lincoln lee commented on FLINK-34712:
-

fixed in 1.19: 4397c9f1d0f300bd8c0f9de4e59e1a5d7883ec2c

> Update reference data for Migration Tests
> -
>
> Key: FLINK-34712
> URL: https://issues.apache.org/jira/browse/FLINK-34712
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Lincoln Lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> Update migration tests in master to cover migration from new version. Since 
> 1.18, this step could be done automatically with the following steps. For 
> more information please refer to [this 
> page.|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/README.md]
>  # {*}On the published release tag (e.g., release-1.16.0){*}, run 
> {panel}
> {panel}
> |{{$ mvn clean }}{{package}} {{{}-Pgenerate-migration-test-data 
> -Dgenerate.version={}}}{{{}1.16{}}} {{-nsu -Dfast -DskipTests}}|
> The version (1.16 in the command above) should be replaced with the target 
> one.
>  # Modify the content of the file 
> [apache/flink:flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version]
>  to the latest version (it would be "v1_16" if sticking to the example where 
> 1.16.0 was released). 
>  # Commit the modification in step a and b with "{_}[release] Generate 
> reference data for state migration tests based on release-1.xx.0{_}" to the 
> corresponding release branch (e.g. {{release-1.16}} in our example), replace 
> "xx" with the actual version (in this example "16"). You should use the Jira 
> issue ID in case of [release]  as the commit message's prefix if you have a 
> dedicated Jira issue for this task.
>  # Cherry-pick the commit to the master branch. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34712][release] Generate reference data for state migration tests based on release-1.19.0 [flink]

2024-04-07 Thread via GitHub


lincoln-lil merged PR #24517:
URL: https://github.com/apache/flink/pull/24517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions [flink]

2024-04-07 Thread via GitHub


flinkbot commented on PR #24629:
URL: https://github.com/apache/flink/pull/24629#issuecomment-2041363992

   
   ## CI report:
   
   * 1eeedee02ccdb674b5b2598bec8531e8eac49bf9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][doc] Generate and use docs of RpcOptions instead of AkkaOptions [flink]

2024-04-07 Thread via GitHub


Zakelly opened a new pull request, #24629:
URL: https://github.com/apache/flink/pull/24629

   ## What is the purpose of the change
   
   Currently, when re-generating docs on master branch, the 
`rpc_configuration.html` is generated. The `AkkaOptions` is renamed to 
`RpcOptions` via FLINK-32684 so docs should be re-generated and all the content 
including should point to the new one.
   
   ## Brief change log
   
- Re-generate docs. `rpc_configuration.html` is generated.
- Replace all including of `akka_configuration.html` with 
`rpc_configuration.html`
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34634]Fix that Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed [flink-cdc]

2024-04-07 Thread via GitHub


ruanhang1993 commented on code in PR #3134:
URL: https://github.com/apache/flink-cdc/pull/3134#discussion_r1554839652


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java:
##
@@ -307,8 +307,17 @@ private void sendStreamMetaRequestEvent(int subTask, 
StreamSplitMetaRequestEvent
 finishedSnapshotSplitInfos, 
sourceConfig.getSplitMetaGroupSize());
 }
 final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
-
-if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
+final int totalFinishedSplitSizeOfReader = 
requestEvent.getTotalFinishedSplitSize();
+final int totalFinishedSplitSizeOfEnumerator = 
splitAssigner.getFinishedSplitInfos().size();
+if (totalFinishedSplitSizeOfReader > 
totalFinishedSplitSizeOfEnumerator) {

Review Comment:
   Add logs.



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java:
##
@@ -43,10 +43,14 @@ public class BinlogSplitMetaEvent implements SourceEvent {
  */
 private final List metaGroup;
 
-public BinlogSplitMetaEvent(String splitId, int metaGroupId, List 
metaGroup) {
+private final int totalFinishedSplitSize;
+
+public BinlogSplitMetaEvent(
+String splitId, int metaGroupId, List metaGroup, int 
totalFinishedSplitSize) {

Review Comment:
   Add `@Nullable`



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java:
##
@@ -395,11 +396,17 @@ private void 
fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
 MySqlBinlogSplit binlogSplit = 
uncompletedBinlogSplits.get(metadataEvent.getSplitId());
 if (binlogSplit != null) {
 final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
+final int receivedTotalFinishedSplitSize = 
metadataEvent.getTotalFinishedSplitSize();
 final int expectedMetaGroupId =
 ChunkUtils.getNextMetaGroupId(
 binlogSplit.getFinishedSnapshotSplitInfos().size(),
 sourceConfig.getSplitMetaGroupSize());
-if (receivedMetaGroupId == expectedMetaGroupId) {
+if (receivedTotalFinishedSplitSize < 
binlogSplit.getTotalFinishedSplitSize()) {

Review Comment:
   Add some logs here.



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java:
##
@@ -298,22 +298,35 @@ private void sendBinlogMeta(int subTask, 
BinlogSplitMetaRequestEvent requestEven
 finishedSnapshotSplitInfos, 
sourceConfig.getSplitMetaGroupSize());
 }
 final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
-
-if (binlogSplitMeta.size() > requestMetaGroupId) {
+final int totalFinishedSplitSizeOfReader = 
requestEvent.getTotalFinishedSplitSize();
+final int totalFinishedSplitSizeOfEnumerator = 
splitAssigner.getFinishedSplitInfos().size();
+if (totalFinishedSplitSizeOfReader > 
totalFinishedSplitSizeOfEnumerator) {

Review Comment:
   Add some logs here.



##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java:
##
@@ -43,10 +43,14 @@ public class StreamSplitMetaEvent implements SourceEvent {
  */
 private final List metaGroup;
 
-public StreamSplitMetaEvent(String splitId, int metaGroupId, List 
metaGroup) {
+private final int totalFinishedSplitSize;
+
+public StreamSplitMetaEvent(
+String splitId, int metaGroupId, List metaGroup, int 
totalFinishedSplitSize) {

Review Comment:
   Add `@Nullable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34995) flink kafka connector source stuck when partition leader invalid

2024-04-07 Thread yansuopeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834629#comment-17834629
 ] 

yansuopeng commented on FLINK-34995:


https://github.com/apache/flink-connector-kafka/pull/91

> flink kafka connector source stuck when partition leader invalid
> 
>
> Key: FLINK-34995
> URL: https://issues.apache.org/jira/browse/FLINK-34995
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.19.0, 1.18.1
>Reporter: yansuopeng
>Priority: Major
>  Labels: pull-request-available
>
> when partition leader invalid(leader=-1),  the flink streaming job using 
> KafkaSource can't restart or start a new instance with a new groupid,  it 
> will stuck and got following exception:
> "{*}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition aaa-1 could be determined{*}"
> when leader=-1,  kafka api like KafkaConsumer.position() will block until 
> either the position could be determined or an unrecoverable error is 
> encountered 
> infact,  leader=-1 not easy to avoid,  even replica=3, three disk offline 
> together will trigger the problem, especially when the cluster size is 
> relatively large.    it rely on kafka administrator to fix in time,  but it 
> take risk when in kafka cluster peak period.
> I have solve this problem, and want to create a PR. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34995) flink kafka connector source stuck when partition leader invalid

2024-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34995:
---
Labels: pull-request-available  (was: )

> flink kafka connector source stuck when partition leader invalid
> 
>
> Key: FLINK-34995
> URL: https://issues.apache.org/jira/browse/FLINK-34995
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.19.0, 1.18.1
>Reporter: yansuopeng
>Priority: Major
>  Labels: pull-request-available
>
> when partition leader invalid(leader=-1),  the flink streaming job using 
> KafkaSource can't restart or start a new instance with a new groupid,  it 
> will stuck and got following exception:
> "{*}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition aaa-1 could be determined{*}"
> when leader=-1,  kafka api like KafkaConsumer.position() will block until 
> either the position could be determined or an unrecoverable error is 
> encountered 
> infact,  leader=-1 not easy to avoid,  even replica=3, three disk offline 
> together will trigger the problem, especially when the cluster size is 
> relatively large.    it rely on kafka administrator to fix in time,  but it 
> take risk when in kafka cluster peak period.
> I have solve this problem, and want to create a PR. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34995] flink kafka connector source stuck when partition leade… [flink-connector-kafka]

2024-04-07 Thread via GitHub


boring-cyborg[bot] commented on PR #91:
URL: 
https://github.com/apache/flink-connector-kafka/pull/91#issuecomment-2041355545

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >