Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-29 Thread via GitHub


SamBarker commented on PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2138478974

   > It compiles & install flink before run benchmarks via 
/mnt/jenkins/tools/hudson.tasks.Maven_MavenInstallation/M3/bin/mvn 
-Dflink.version=1.20-SNAPSHOT clean install exec:exec -Penable-async-profiler 
-DskipTests -Drat.skip=true '-Dbenchmarks=org.apache.flink.state.benchmark.*' 
'-DbenchmarkExcludes=org.apache.flink.benchmark.full.*' 
-DexecutableJava=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-4.0.2.al8.x86_64/bin/java
 -DasyncProfilerLib=/opt/async-profiler-3.0-linux-x64/lib/libasyncProfiler.so 
-DprofResultPath=./profile-result
   >It would be great if you could provide some options to speed this up.
   
   It looks to me like it was handling the excludes and includes correctly 
which is what I was concerned about. I can't reproduce a slower runtime with 
the profiler on
   With profiler
   ```
   mvn -Dflink.version=1.20-SNAPSHOT -DexecutableJava=${JAVA_HOME}/bin/java 
-DasyncProfilerLib=/home/sam/src/async-profiler/async-profiler-3.0-linux-x64/lib/libasyncProfiler.so
 -Dbenchmarks="SerializationFrameworkMiniBenchmarks.serializerHeavyString" 
exec:exec --offline
   ... 
   snip 
   ...
   
   # JMH version: 1.37
   # VM version: JDK 11.0.23, OpenJDK 64-Bit Server VM, 11.0.23+9
   # VM invoker: /home/sam/.sdkman/candidates/java/11.0.23-tem/bin/java
   # VM options: -Djava.rmi.server.hostname=127.0.0.1 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
   # Blackhole mode: full + dont-inline hint (auto-detected, use 
-Djmh.blackhole.autoDetect=false to disable)
   # Warmup: 10 iterations, 10 s each
   # Measurement: 10 iterations, 10 s each
   # Timeout: 10 min per iteration
   # Threads: 1 thread, will synchronize iterations
   # Benchmark mode: Throughput, ops/time
   # Benchmark: 
org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString
   
   # Run progress: 0.00% complete, ETA 00:10:00
   ... 
   snip 
   ...
[INFO] 

   [INFO] BUILD SUCCESS
   [INFO] 

   [INFO] Total time:  11:17 min
   [INFO] Finished at: 2024-05-30T12:10:42+12:00
   [INFO] 

   
   ```
   
   without profiler
   ```
   mvn -Dflink.version=1.20-SNAPSHOT -DexecutableJava=${JAVA_HOME}/bin/java 
-Dbenchmarks="SerializationFrameworkMiniBenchmarks.serializerHeavyString" 
exec:exec --offline
   ... 
   snip 
   ...
   # JMH version: 1.37
   # VM version: JDK 11.0.23, OpenJDK 64-Bit Server VM, 11.0.23+9
   # VM invoker: /home/sam/.sdkman/candidates/java/11.0.23-tem/bin/java
   # VM options: -Djava.rmi.server.hostname=127.0.0.1 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
   # Blackhole mode: full + dont-inline hint (auto-detected, use 
-Djmh.blackhole.autoDetect=false to disable)
   # Warmup: 10 iterations, 10 s each
   # Measurement: 10 iterations, 10 s each
   # Timeout: 10 min per iteration
   # Threads: 1 thread, will synchronize iterations
   # Benchmark mode: Throughput, ops/time
   # Benchmark: 
org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString
   
   # Run progress: 0.00% complete, ETA 00:10:00
   ... 
   snip
   ...
   [INFO] 

   [INFO] BUILD SUCCESS
   [INFO] 

   [INFO] Total time:  11:18 min
   [INFO] Finished at: 2024-05-30T12:27:06+12:00
   [INFO] 

   ```
   
   ~So I'm not really sure what to suggest.~
   
   Wait a minute. This PR changes the version of JMH. Its not the profiler 
being ON or OFF that affects the runtime its the JMH version.
   
   going back to main I get 
   
   ```
   # JMH version: 1.19
   # VM version: JDK 11.0.23, VM 11.0.23+9
   # VM invoker: /home/sam/.sdkman/candidates/java/11.0.23-tem/bin/java
   # VM options: -Djava.rmi.server.hostname=127.0.0.1 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl
   # Warmup: 10 iterations, 1 s each
   # Measurement: 10 iterations, 1 s each
   # Timeout: 10 min per iteration
   # Threads: 1 thread, will synchronize iterations
   # Benchmark mode: Throughput, ops/time
   # Benchmark: 
org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString
   
   # Run progress: 0.00% complete, ETA 00:01:00
   ```
   
   This may be related to changes in warmups in [JMH 
1.21](https://mail.openjdk.org/pipermail/jmh-dev/2018-May/002753.html) 
   > *) Defaults for warmup/measurement/forks were reconsidered 

Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-05-29 Thread via GitHub


jeyhunkarimov commented on code in PR #24845:
URL: https://github.com/apache/flink/pull/24845#discussion_r161942


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link java.util.Set}. The serializer relies on an element 
serializer for the
+ * serialization of the set's elements.
+ *
+ * The serialization format for the set is as follows: four bytes for the 
length of the set,
+ * followed by the serialized representation of each element.
+ *
+ * @param  The type of element in the set.
+ */
+@Internal
+public final class SetSerializer extends TypeSerializer> {
+
+private static final long serialVersionUID = 1L;
+
+/** The serializer for the elements of the set. */
+private final TypeSerializer elementSerializer;
+
+/**
+ * Creates a set serializer that uses the given serializer to serialize 
the set's elements.
+ *
+ * @param elementSerializer The serializer for the elements of the set
+ */
+public SetSerializer(TypeSerializer elementSerializer) {
+this.elementSerializer = checkNotNull(elementSerializer);
+}
+
+// 
+//  SetSerializer specific properties
+// 
+
+/**
+ * Gets the serializer for the elements of the set.
+ *
+ * @return The serializer for the elements of the set
+ */
+public TypeSerializer getElementSerializer() {
+return elementSerializer;
+}
+
+// 
+//  Type Serializer implementation
+// 
+
+@Override
+public boolean isImmutableType() {
+return false;
+}
+
+@Override
+public TypeSerializer> duplicate() {
+TypeSerializer duplicateElement = elementSerializer.duplicate();
+return duplicateElement == elementSerializer ? this : new 
SetSerializer<>(duplicateElement);
+}
+
+@Override
+public Set createInstance() {
+return new HashSet<>(0);
+}
+
+@Override
+public Set copy(Set from) {
+Set newSet = new HashSet<>(from.size());
+for (T element : from) {
+newSet.add(elementSerializer.copy(element));
+}
+return newSet;
+}
+
+@Override
+public Set copy(Set from, Set reuse) {
+return copy(from);
+}
+
+@Override
+public int getLength() {
+return -1; // var length
+}
+
+@Override
+public void serialize(Set set, DataOutputView target) throws 
IOException {
+final int size = set.size();
+target.writeInt(size);
+for (T element : set) {
+elementSerializer.serialize(element, target);
+}
+}
+
+@Override
+public Set deserialize(DataInputView source) throws IOException {
+final int size = source.readInt();
+final Set set = new HashSet<>(size);

Review Comment:
   In order to ensure the ordering guarantee, do you think using 
`LinkedHashSet` might be a good idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34559] Limit Global & Local Aggregation buffers [flink]

2024-05-29 Thread via GitHub


rkhachatryan commented on code in PR #24869:
URL: https://github.com/apache/flink/pull/24869#discussion_r1619465130


##
flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java:
##
@@ -60,4 +61,56 @@ public class AlgorithmOptions {
 "Whether to use the LargeRecordHandler when 
spilling. If a record will not fit into the sorting"
 + " buffer. The record will be spilled on 
disk and the sorting will continue with only the key."
 + " The record itself will be read 
afterwards when merging.");
+
+public static final ConfigOption GLOBAL_AGG_BUFFER_SIZE =

Review Comment:
   @twalthr I remember we had some discussions around the right place for these 
options.
   Do you have any alternatives in mind?
   How about org.apache.flink.configuration.ExecutionOptions?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34559] Limit Global & Local Aggregation buffers [flink]

2024-05-29 Thread via GitHub


flinkbot commented on PR #24869:
URL: https://github.com/apache/flink/pull/24869#issuecomment-2138245149

   
   ## CI report:
   
   * 3b92e02418068b4ed7def5609dfc8b1fdf5d63e4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34559] Limit Global & Local Aggregation buffers [flink]

2024-05-29 Thread via GitHub


rkhachatryan opened a new pull request, #24869:
URL: https://github.com/apache/flink/pull/24869

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-29 Thread via GitHub


sap1ens commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138173997

   > Would you recommend keeping it normal, String?
   
   Yes! And if someone needs base64 encoding they can encode it in the 
`SerializationSchema`.
   
   > I have a follow up query on spotless violations. How did you run that. 
Whenever I am trying to do that, it shows build succeeded with no error and I 
can see "spotless skipped" in build logs[Attached screenshot]. Is there some 
setting in code which i need to do to enable spotless working for me?
   
   Hmm, I ran `mvn clean package -DskipTests` in the root of the project and 
ended up seeing a bunch of spotless violations. Running `mvn spotless:apply` 
changed a lot of files in this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-29 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138092946

   > @19priyadhingra  I've had the opportunity to build and try this 
connector. Want to share some feedback:
   > 
   > * Currently, it looks like all messages are encoded with base64. It'd be 
great to be able to not use it, e.g. by setting a custom `ElementConverter`.
   > * There are lot of `spotless` violations, you probably want to run `mvn 
spotless:apply`.
   
   @sam1ens, thanks a lot for the feedback. 
   1) Base64 encoding was the one we were using in our production environment 
and intention for using it was that base64-encoding guarantees that no invalid 
characters are present in the message, but now I understand that others might 
not need it. Would you recommend keeping it normal, String?
   
   2) I have a follow up query on spotless violations. How did you run that. 
Whenever I am trying to do that, it shows build succeeded with no error and I 
can see "spotless skipped" in build logs[Attached screenshot]. Is there some 
setting in code which i need to do to enable spotless working for me? 
   
![image](https://github.com/apache/flink-connector-aws/assets/169495197/20262c6a-9492-476c-a5b2-ce7809664e8a)
   Sorry for the basic question, I am working on this package for the first 
time :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-29 Thread via GitHub


sap1ens commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2137994741

   @19priyadhingra  I've had the opportunity to build and try this connector. 
Want to share some feedback:
   
   - Currently, it looks like all messages are encoded with base64. It'd be 
great to be able to not use it, e.g. by setting a custom `ElementConverter`. 
   - There are lot of `spotless` violations, you probably want to run `mvn 
spotless:apply`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]

2024-05-29 Thread via GitHub


liuml07 commented on code in PR #105:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1619214815


##
flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java:
##
@@ -21,86 +21,144 @@
 
 package org.apache.flink.connector.elasticsearch.sink;
 
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
 import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.util.EntityUtils;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Base Integration tests class. */
-@Testcontainers
-public class ElasticsearchSinkBaseITCase {
+/**
+ * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests.
+ *
+ * It is extended with the {@link ParameterizedTestExtension} for 
parameterized testing against
+ * secure and non-secure Elasticsearch clusters. Tests must be annotated by 
{@link TestTemplate} in
+ * order to be parameterized.
+ *
+ * The cluster is running via test containers. In order to reuse the 
singleton containers by all
+ * inheriting test classes, we manage their lifecycle. The two containers are 
started only once when
+ * this class is loaded. At the end of the test suite the Ryuk container that 
is started by
+ * Testcontainers core will take care of stopping the singleton container.
+ */
+@ExtendWith(ParameterizedTestExtension.class)

Review Comment:
Let me attach a screenshot when I run it locally. Both containers are used 
by each test case, and also the two containers are used by all test cases of 
all test classes.
   
   https://github.com/apache/flink-connector-elasticsearch/assets/159186/b9886aa9-67b6-43ee-9b8b-941209383268;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]

2024-05-29 Thread via GitHub


liuml07 commented on code in PR #105:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1619214815


##
flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java:
##
@@ -21,86 +21,144 @@
 
 package org.apache.flink.connector.elasticsearch.sink;
 
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
 import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.util.EntityUtils;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Base Integration tests class. */
-@Testcontainers
-public class ElasticsearchSinkBaseITCase {
+/**
+ * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests.
+ *
+ * It is extended with the {@link ParameterizedTestExtension} for 
parameterized testing against
+ * secure and non-secure Elasticsearch clusters. Tests must be annotated by 
{@link TestTemplate} in
+ * order to be parameterized.
+ *
+ * The cluster is running via test containers. In order to reuse the 
singleton containers by all
+ * inheriting test classes, we manage their lifecycle. The two containers are 
started only once when
+ * this class is loaded. At the end of the test suite the Ryuk container that 
is started by
+ * Testcontainers core will take care of stopping the singleton container.
+ */
+@ExtendWith(ParameterizedTestExtension.class)

Review Comment:
   
Let me attach a screenshot when I run it locally. Both containers are used 
by each test case, and also the two containers are used by all test cases of 
all test classes.
   
   https://github.com/apache/flink-connector-elasticsearch/assets/159186/b9886aa9-67b6-43ee-9b8b-941209383268;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]

2024-05-29 Thread via GitHub


liuml07 commented on code in PR #105:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1619214117


##
flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java:
##
@@ -21,86 +21,144 @@
 
 package org.apache.flink.connector.elasticsearch.sink;
 
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
 import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.util.EntityUtils;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Base Integration tests class. */
-@Testcontainers
-public class ElasticsearchSinkBaseITCase {
+/**
+ * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests.
+ *
+ * It is extended with the {@link ParameterizedTestExtension} for 
parameterized testing against
+ * secure and non-secure Elasticsearch clusters. Tests must be annotated by 
{@link TestTemplate} in
+ * order to be parameterized.
+ *
+ * The cluster is running via test containers. In order to reuse the 
singleton containers by all
+ * inheriting test classes, we manage their lifecycle. The two containers are 
started only once when
+ * this class is loaded. At the end of the test suite the Ryuk container that 
is started by
+ * Testcontainers core will take care of stopping the singleton container.
+ */
+@ExtendWith(ParameterizedTestExtension.class)

Review Comment:
   Thanks for taking a look, @reta!
   
   The idea of using ParameterizedTestExtension is to make all subclasses run 
with both secure and non-secure tests by default. So tests do not actually 
depend on one static container, but instead use both of them. That way, we can 
gain higher confidence when making specific changes to secure or non-secure 
case. For example, adding a new NetworkConfig option, be it SSL related or 
authentication provider.
   
   Those secure and non-secure containers are set up automatically and are 
singletons of the same JVM, which are reused by all test classes. Using 
`@Container` would make each test class create it's own containers, as we 
introduce more test classes, this seems sub-optimal.
   
   Secondly, having two `@Container` in the base class meaning both containers 
will be set up (for each class). If the subclass only chooses one to use via 
`enableSecurity()`, the other one is just slowing down the tests without 
providing any value.
   
   I have been reading the doc of testcontainers and didn't find a solution 
that works best for base classes via `@Container` managed lifecycle and still 
efficiently. The suggestion was to manually manage the lifecycle which is 
simple enough to start it statically, and it will get closed automatically.[1]
   
   [1] 
https://java.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35448] Translate pod templates documentation into Chinese [flink-kubernetes-operator]

2024-05-29 Thread via GitHub


caicancai commented on code in PR #830:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/830#discussion_r1619132831


##
docs/content/docs/custom-resource/pod-template.md:
##
@@ -93,16 +90,18 @@ spec:
 ```
 
 {{< hint info >}}
-When using the operator with Flink native Kubernetes integration, please refer 
to [pod template field precedence](
-https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink).
+当使用具有 Flink 原生 Kubernetes 集成的 operator 时,请参考 [pod template 字段优先级](
+https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink)。
 {{< /hint >}}
 
+
 ## Array Merging Behaviour
 
-When layering pod templates (defining both a top level and jobmanager specific 
podtemplate for example) the corresponding yamls are merged together.
+
+
+当分层 pod templates(例如同时定义顶级和任务管理器特定的 pod 模板)时,相应的 yaml 会合并在一起。
 
-The default behaviour of the pod template mechanism is to merge array arrays 
by merging the objects in the respective array positions.
-This requires that containers in the podTemplates are defined in the same 
order otherwise the results may be undefined.
+默认的 pod 模板机制行为是通过合并相应数组位置的对象来合并数组数组。

Review Comment:
   array arrays?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] A couple of hotfixes [flink]

2024-05-29 Thread via GitHub


pnowojski commented on PR #24868:
URL: https://github.com/apache/flink/pull/24868#issuecomment-2137725610

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32088][checkpoint] Space control in file merging [flink]

2024-05-29 Thread via GitHub


Zakelly commented on PR #24807:
URL: https://github.com/apache/flink/pull/24807#issuecomment-2137702975

   Rebased to resolve conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]

2024-05-29 Thread via GitHub


Zakelly merged PR #24374:
URL: https://github.com/apache/flink/pull/24374


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-35311 Draft pr for initial review of apicurio [flink]

2024-05-29 Thread via GitHub


nictownsend commented on code in PR #24715:
URL: https://github.com/apache/flink/pull/24715#discussion_r1618968305


##
flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioRegistryAvroFormatFactory.java:
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.apicurio;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.lang.String.format;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_PASSWORD;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_USERID;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_ID;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_SECRET;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_SCOPE;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_URL;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.PROPERTIES;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_DESCRIPTION;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_ID;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_NAME;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_VERSION;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SCHEMA;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_LOCATION;
+import static 
org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_PASSWORD;
+import static 

Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2137605027

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2137601891

   CI test failure is unrelated: FLINK-34513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]

2024-05-29 Thread via GitHub


reta commented on code in PR #105:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1618900785


##
flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java:
##
@@ -21,86 +21,144 @@
 
 package org.apache.flink.connector.elasticsearch.sink;
 
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
 import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.util.EntityUtils;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Base Integration tests class. */
-@Testcontainers
-public class ElasticsearchSinkBaseITCase {
+/**
+ * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests.
+ *
+ * It is extended with the {@link ParameterizedTestExtension} for 
parameterized testing against
+ * secure and non-secure Elasticsearch clusters. Tests must be annotated by 
{@link TestTemplate} in
+ * order to be parameterized.
+ *
+ * The cluster is running via test containers. In order to reuse the 
singleton containers by all
+ * inheriting test classes, we manage their lifecycle. The two containers are 
started only once when
+ * this class is loaded. At the end of the test suite the Ryuk container that 
is started by
+ * Testcontainers core will take care of stopping the singleton container.
+ */
+@ExtendWith(ParameterizedTestExtension.class)

Review Comment:
   Thanks @liuml07 , I think the usage of 
   
   ```suggestion
   @ExtendWith(ParameterizedTestExtension.class)
   ```
   
   is not really bringing value here since the test cases depend on static 
containers (and  parameterized tests do not support that). However, I think you 
have quite interesting idea to explore, I would suggest to try that:
- create 2 containers (as you do now but using the `@Testcontainers` 
extension):
   ```
   @Container
public static final ElasticsearchContainer ES_CONTAINER = 
createElasticsearchContainer();
   
   @Container
   public static final ElasticsearchContainer ES_SECURE_CONTAINER = 
createElasticsearchSecureContainer();
   ```
- consequently, use 2 `RestClient` clients, secure and non-secure
- introduce abstract method, fe `abstract boolean enableSecurity()` so each 
test case would have to override and the base test case could use to make the 
decision which client  + customization to use
   
   WDYT? Thank you.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] A couple of hotfixes [flink]

2024-05-29 Thread via GitHub


pnowojski commented on code in PR #24868:
URL: https://github.com/apache/flink/pull/24868#discussion_r1618893857


##
flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java:
##
@@ -638,29 +637,15 @@ public boolean equals(Object o) {
 }
 
 Transformation that = (Transformation) o;
-
-if (bufferTimeout != that.bufferTimeout) {
-return false;
-}
-if (id != that.id) {
-return false;
-}
-if (parallelism != that.parallelism) {
-return false;
-}
-if (!name.equals(that.name)) {
-return false;
-}
-return outputType != null ? outputType.equals(that.outputType) : 
that.outputType == null;
+return Objects.equals(bufferTimeout, that.bufferTimeout)
+&& Objects.equals(id, that.id)
+&& Objects.equals(parallelism, that.parallelism)
+&& Objects.equals(name, that.name)
+&& Objects.equals(outputType, that.outputType);
 }
 
 @Override
 public int hashCode() {
-int result = id;
-result = 31 * result + name.hashCode();
-result = 31 * result + (outputType != null ? outputType.hashCode() : 
0);
-result = 31 * result + parallelism;
-result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
-return result;
+return Objects.hash(id, name, outputType, parallelism, bufferTimeout);

Review Comment:
   Thanks for pointing this out but I don't see how this should matter in our 
case. I have never seen any `Objects.hash` call anywhere being issue in Flink. 
Also `Transformation` is not used on any critical path, but during job 
submission, where there are tons of much heavier operations happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] A couple of hotfixes [flink]

2024-05-29 Thread via GitHub


snuyanzin commented on code in PR #24868:
URL: https://github.com/apache/flink/pull/24868#discussion_r1618885971


##
flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java:
##
@@ -638,29 +637,15 @@ public boolean equals(Object o) {
 }
 
 Transformation that = (Transformation) o;
-
-if (bufferTimeout != that.bufferTimeout) {
-return false;
-}
-if (id != that.id) {
-return false;
-}
-if (parallelism != that.parallelism) {
-return false;
-}
-if (!name.equals(that.name)) {
-return false;
-}
-return outputType != null ? outputType.equals(that.outputType) : 
that.outputType == null;
+return Objects.equals(bufferTimeout, that.bufferTimeout)
+&& Objects.equals(id, that.id)
+&& Objects.equals(parallelism, that.parallelism)
+&& Objects.equals(name, that.name)
+&& Objects.equals(outputType, that.outputType);
 }
 
 @Override
 public int hashCode() {
-int result = id;
-result = 31 * result + name.hashCode();
-result = 31 * result + (outputType != null ? outputType.hashCode() : 
0);
-result = 31 * result + parallelism;
-result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
-return result;
+return Objects.hash(id, name, outputType, parallelism, bufferTimeout);

Review Comment:
   Not sure how critical it is here
   however it seems `Objects#hash` for more than one arg could cost more
   more details at https://bugs.openjdk.org/browse/JDK-8332840



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] A couple of hotfixes [flink]

2024-05-29 Thread via GitHub


flinkbot commented on PR #24868:
URL: https://github.com/apache/flink/pull/24868#issuecomment-2137398488

   
   ## CI report:
   
   * dd51527c7f8d44e49f39f8dd27c6fac2f34d65da UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] A couple of hotfixes [flink]

2024-05-29 Thread via GitHub


pnowojski opened a new pull request, #24868:
URL: https://github.com/apache/flink/pull/24868

   ## What is the purpose of the change
   
   A couple of hotfixes
   
   ## Brief change log
   
   Please check individual commit messages
   
   ## Verifying this change
   
   Covered by existing tests and no functional changes.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? **(not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]

2024-05-29 Thread via GitHub


czy006 commented on PR #3332:
URL: https://github.com/apache/flink-cdc/pull/3332#issuecomment-2137368517

   > Fix [FLINK-35173](https://issues.apache.org/jira/browse/FLINK-35173) 
MysqlDebeziumTimeConverter miss timezone convert to timestamp.
   
   [hotfix-bp-3.1](https://github.com/apache/flink-cdc/pull/3380)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]

2024-05-29 Thread via GitHub


czy006 opened a new pull request, #3380:
URL: https://github.com/apache/flink-cdc/pull/3380

   hotfix Fix [FLINK-35173](https://issues.apache.org/jira/browse/FLINK-35173) 
MysqlDebeziumTimeConverter miss timezone convert to timestamp.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]

2024-05-29 Thread via GitHub


czy006 closed pull request #3379: [BP-3.1][hotfix] Fix 
MysqlDebeziumTimeConverter miss timezone convert to timestamp
URL: https://github.com/apache/flink-cdc/pull/3379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]

2024-05-29 Thread via GitHub


czy006 opened a new pull request, #3379:
URL: https://github.com/apache/flink-cdc/pull/3379

   BP3.1 to Fix 
[FLINK-35173](https://issues.apache.org/jira/browse/FLINK-35173) 
MysqlDebeziumTimeConverter miss timezone convert to timestamp.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][statebackend/forst] Normalize ForSt option names and working dir [flink]

2024-05-29 Thread via GitHub


masteryhx closed pull request #24854: [hotfix][statebackend/forst] Normalize 
ForSt option names and working dir
URL: https://github.com/apache/flink/pull/24854


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-29 Thread via GitHub


Zakelly commented on PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2137299531

   Here's the full log before the pipeline expired. 
[jenkins.tar.gz](https://github.com/apache/flink-benchmarks/files/15484257/jenkins.tar.gz)
   
   It compiles & install flink before run benchmarks via 
`/mnt/jenkins/tools/hudson.tasks.Maven_MavenInstallation/M3/bin/mvn 
-Dflink.version=1.20-SNAPSHOT clean install exec:exec -Penable-async-profiler 
-DskipTests -Drat.skip=true '-Dbenchmarks=org.apache.flink.state.benchmark.*' 
'-DbenchmarkExcludes=org.apache.flink.benchmark.full.*' 
-DexecutableJava=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-4.0.2.al8.x86_64/bin/java
 -DasyncProfilerLib=/opt/async-profiler-3.0-linux-x64/lib/libasyncProfiler.so 
-DprofResultPath=./profile-result`
   
   
   It would be great if you could provide some options that could help speed 
this up.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-35311 Kafka part required for Flink Apicurio Avro support. Prototype for review [flink-connector-kafka]

2024-05-29 Thread via GitHub


nictownsend commented on code in PR #99:
URL: 
https://github.com/apache/flink-connector-kafka/pull/99#discussion_r1618477190


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##
@@ -55,6 +61,34 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema producedTypeInfo;
 
 private final boolean upsertMode;
+private static Method deserializeWithAdditionalPropertiesMethod = null;
+protected static final String IS_KEY = "IS_KEY";
+
+protected static final String HEADERS = "HEADERS";
+
+static {
+initializeMethod();
+}
+
+protected static void initializeMethod() {

Review Comment:
   Probably worth some javadoc here to explain that this is for backwards 
compatibility (versions of flink that don't have the method)



##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##
@@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord 
record, Collector record,
+Map additionalParameters,
+Collector collector,
+boolean isKey)
+throws IOException {
+if (deserializeWithAdditionalPropertiesMethod == null) {
+if (isKey) {
+keyDeserialization.deserialize(record.key(), collector);
+} else {
+valueDeserialization.deserialize(record.value(), collector);
+}
+
+} else {
+additionalParameters.put(IS_KEY, isKey);
+try {
+if (isKey) {
+deserializeWithAdditionalPropertiesMethod.invoke(
+keyDeserialization, record.key(), 
additionalParameters, collector);
+} else {
+deserializeWithAdditionalPropertiesMethod.invoke(
+valueDeserialization, record.value(), 
additionalParameters, collector);
+}
+} catch (IllegalAccessException e) {

Review Comment:
   Syntax may be cleaner with `|` as you're handling both exceptions in the 
same way



##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java:
##
@@ -36,10 +42,36 @@ public class KafkaDeserializationSchemaWrapper 
implements KafkaDeserializatio
 
 private static final long serialVersionUID = 2651665280744549932L;
 
+private static Method deserializeWithAdditionalPropertiesMethod = null;
+
+protected static final String IS_KEY = "IS_KEY";
+
+protected static final String HEADERS = "HEADERS";
+
+static {

Review Comment:
   Why is 
https://github.com/apache/flink-connector-kafka/pull/99/files?file-filters%5B%5D=.java=true#diff-0a1bb10356e97bd2de4e3e8a257cf8c94290dd0aadb133d9288b6d1bc60b5994R73
 different in terms of the `static` block?
   
   You use a static block here, but a static method in 
DynamicKafkaDeserializationSchema.
   
   However, I like the idea of an interface method `initializeMethod` that 
returns `Method` rather than `void` - the typing gives 
safety that `intializeMethod` returns the Method from a class structure that 
has Deserialization as the parent.
   
   I do appreciate that the typing doesn't guarantee the method returned is one 
with the correct parameters - but I think catching the runtime exception is as 
good as trying to validate the method before using it 
https://www.tutorialspoint.com/javareflect/javareflect_method_getparametertypes.htm



##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##
@@ -74,25 +113,63 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
 @Override
 public ProducerRecord serialize(
 RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+
+// keeping the metadataHeaders maps for keys and values separate so 
these maps
+// are output only for the methods; avoiding changing the inputs.
+// define the input map
+Map inputMap = new HashMap<>();
+
+// define the output maps, it is 2 levels deep in case we need to add 
new information in
+// the future
+Map outputKeyMap = new HashMap<>();
+Map outputValueMap = new HashMap<>();
+
+inputMap.put(TOPIC_NAME, topic);
+
 // shortcut in case no input projection is required
 if (keySerialization == null && !hasMetadata) {
-final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
-return new ProducerRecord<>(
-topic,
-extractPartition(
-consumedRow,
-null,
-valueSerialized,
-

Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]

2024-05-29 Thread via GitHub


Zakelly commented on PR #24374:
URL: https://github.com/apache/flink/pull/24374#issuecomment-2137277271

   I'll merge this once the CI green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-15753][web] Displaying non-numeric metrics in Web UI dashboard [flink]

2024-05-29 Thread via GitHub


flinkbot commented on PR #24867:
URL: https://github.com/apache/flink/pull/24867#issuecomment-2137215585

   
   ## CI report:
   
   * a91eedf8891c31ea59064821ff124ca6c473da6d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-15753][web] Displaying non-numeric metrics in Web UI dashboard [flink]

2024-05-29 Thread via GitHub


yuxiqian opened a new pull request, #24867:
URL: https://github.com/apache/flink/pull/24867

   ## What is the purpose of the change
   
   Currently, Flink allows creating gauge metrics with various types 
[[1]](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#gauge).
 However Web UI doesn't support showing any types other than numerics and 
always displays `NaN`, which isn't very helpful. This change lets dashboard 
displays raw metric value when converting to numbers isn't applicable.
   
   ## Brief change log
   
 - Adds raw value display as fallback when receiving non-numeric metric 
values
 - Changes "Chart / Numeric" label to "Chart / Literal" because not all 
metrics are of number type
   
   ## Verifying this change
   
   This change is a trivial work without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Draft: [FLINK-35199][table] Support the execution of create materialized table in full refresh mode [flink]

2024-05-29 Thread via GitHub


flinkbot commented on PR #24866:
URL: https://github.com/apache/flink/pull/24866#issuecomment-2137173809

   
   ## CI report:
   
   * b0eb4ec03134eea9c5c2574f3cfd9e0e44a496f3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Draft: [FLINK-35199][table] Support the execution of create materialized table in full refresh mode [flink]

2024-05-29 Thread via GitHub


hackergin opened a new pull request, #24866:
URL: https://github.com/apache/flink/pull/24866

   …le in full refresh mode
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35399][doc] Add documents for batch job recovery from jobMaster failover. [flink]

2024-05-29 Thread via GitHub


zhuzhurk commented on code in PR #24855:
URL: https://github.com/apache/flink/pull/24855#discussion_r1618689544


##
docs/content.zh/docs/ops/batch/recovery_from_job_master_failure.md:
##
@@ -77,13 +77,17 @@ job master failover 后能够尽可能的恢复出错前的进度,避免重新
 
 ### 让 Source 支持 job master failover 后进度恢复
 
-目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的 SplitEnumerator 
需要实现 SupportsBatchSnapshot 接口:
+目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的
+{{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java"
 name="SplitEnumerator" >}}
+需要能够支持在批处理场景下做状态快照,并且实现

Review Comment:
   做状态快照 -> 做状态快照(此时 checkpointId = -1)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]

2024-05-29 Thread via GitHub


spoon-lz commented on PR #24374:
URL: https://github.com/apache/flink/pull/24374#issuecomment-2137116772

   @Zakelly @masteryhx Before this PR was merged, I found a new conflict with 
`MaterializedTableManager.java` of the master branch. I have resolved this 
conflict and it can be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [1.18][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]

2024-05-29 Thread via GitHub


pnowojski commented on PR #24865:
URL: https://github.com/apache/flink/pull/24865#issuecomment-2137108109

   @flinkbot run azure 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [1.19][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]

2024-05-29 Thread via GitHub


pnowojski commented on PR #24864:
URL: https://github.com/apache/flink/pull/24864#issuecomment-2137107967

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-29 Thread via GitHub


SamBarker commented on PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2137002719

   > And I find the benchmark runs extremely slow with the profiler on. The ETA 
is up to 8 days for full set of our daily run. So I guess this is best only 
used for single tests that are triggered manually, right?
   
   Yikes thats a bit sad. Running a single test 
(`SerializationFrameworkMiniBenchmarks.serializerHeavyString`) locally I get 10 
mins estimate with or without the profiler and a runtime that seems comparable. 
If I widen the set of benchmarks I get the same ETA with or without the 
profiler. 
   
   So I'm slightly suspicious that the excludes or default includes behaviour 
is being changed but without knowing what command line your seeing that from 
its hard to know. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [minor][cdc] Suppress false alarm in flink config loader [flink-cdc]

2024-05-29 Thread via GitHub


leonardBang merged PR #3292:
URL: https://github.com/apache/flink-cdc/pull/3292


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26808][1.19] Limit FileUploadHandler to multipart routes [flink]

2024-05-29 Thread via GitHub


uce commented on PR #24859:
URL: https://github.com/apache/flink/pull/24859#issuecomment-2136967772

   I fixed the checkstyle error -- we seem to have changed our checkstyle 
configuration between 1.19 and 1.20.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35348][table] Introduce refresh materialized table rest api [flink]

2024-05-29 Thread via GitHub


lsyldliu merged PR #24844:
URL: https://github.com/apache/flink/pull/24844


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-29 Thread via GitHub


SamBarker commented on code in PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618517159


##
benchmark.sh:
##
@@ -0,0 +1,54 @@
+#!/usr/bin/env bash

Review Comment:
   resolved with 
https://github.com/apache/flink-benchmarks/pull/90/commits/74e4f4f56dab07c70e03690279f96a8f168830f4



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35461][config] Improve Runtime Configuration for Flink 2.0 [flink]

2024-05-29 Thread via GitHub


TanYuxin-tyx commented on code in PR #24853:
URL: https://github.com/apache/flink/pull/24853#discussion_r1618468471


##
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java:
##
@@ -373,11 +373,18 @@ public static NettyShuffleEnvironmentConfiguration 
fromConfiguration(
 BoundedBlockingSubpartitionType blockingSubpartitionType =
 getBlockingSubpartitionType(configuration);
 
-boolean batchShuffleCompressionEnabled =
-
configuration.get(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED);
 CompressionCodec compressionCodec =
 
configuration.get(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);
 
+boolean batchShuffleCompressionEnabled;
+if (compressionCodec == CompressionCodec.NONE) {

Review Comment:
   If the codec is NONE while the old config 
`BATCH_SHUFFLE_COMPRESSION_ENABLED` is true, do we need to print a warning log 
to deal with this case?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java:
##
@@ -31,6 +31,7 @@
  * The {@link ConfigOption configuration options} for job execution. Those are 
stream specific
  * options. See also {@link org.apache.flink.configuration.PipelineOptions}.

Review Comment:
   I noticed that other deprecated options add the comments of `This option is 
deprecated in 1.20 and will be removed in 2.0`, do we need to add those 
comments here as well? 



##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -425,6 +446,21 @@ public enum CompressionCodec {
 + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
 + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+/** The timeout for requesting buffers for each channel. */
+@Documentation.ExcludeFromDocumentation(
+"This option is purely implementation related, and may be removed 
as the implementation changes.")
+public static final ConfigOption NETWORK_BUFFERS_REQUEST_TIMEOUT 
=
+key("taskmanager.network.memory.exclusive-buffers-request-timeout")

Review Comment:
   IIRC, this config key change is to remove the concept of `exclusive`, while 
the key remains `exclusive` after this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-29 Thread via GitHub


GOODBOY008 commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1618505598


##
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.TestDataGenerators;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.io.FileUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CsvBulkWriterIT {
+
+@TempDir File outDir;
+
+@Test
+public void testNoDataIsWrittenBeforeFlush() throws Exception {
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+env.enableCheckpointing(100);
+env.setRestartStrategy(RestartStrategies.noRestart());
+
+// Workaround serialization limitations
+File outDirRef = new File(outDir.getAbsolutePath());
+
+FileSink sink =
+FileSink.forBulkFormat(
+new 
org.apache.flink.core.fs.Path(outDir.getAbsolutePath()),
+out -> {
+FSDataOutputStreamWrapper 
outputStreamWrapper =
+new FSDataOutputStreamWrapper(out);
+return new CsvBulkWriterWrapper<>(
+CsvBulkWriter.forPojo(Pojo.class, 
outputStreamWrapper),
+outputStreamWrapper,
+outDirRef);
+})
+.build();
+
+List integers = Arrays.asList(new Pojo(1), new Pojo(2));
+DataGeneratorSource generatorSource =
+TestDataGenerators.fromDataWithSnapshotsLatch(
+integers, TypeInformation.of(Pojo.class));
+env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), 
"").sinkTo(sink);
+env.execute();
+assertThat(getResultsFromSinkFiles(outDir)).containsSequence("1", "2", 
"1", "2");
+}
+
+private static class CsvBulkWriterWrapper implements BulkWriter {
+
+private static int callCounter = 0;
+
+private static int unFlushedCounter = 0;

Review Comment:
   Your suggests looks more clear and easy understand. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35399][doc] Add documents for batch job recovery from jobMaster failover. [flink]

2024-05-29 Thread via GitHub


zhuzhurk commented on code in PR #24855:
URL: https://github.com/apache/flink/pull/24855#discussion_r1618464994


##
docs/content.zh/docs/ops/batch/recovery_from_job_master_failure.md:
##
@@ -75,6 +75,19 @@ JobMaster failover 后能够尽可能的恢复出错前的进度,避免重新
 - [job-event.store.write-buffer.size]({{< ref "docs/deployment/config" 
>}}#job-event-store-write-buffer-size):
   JobEventStore 写出缓冲区的大小。一旦缓冲区满了,内容将被刷新到外部文件系统。
 
+### 让 Source 支持 job master failover 后进度恢复
+
+目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的 SplitEnumerator 
需要实现 SupportsBatchSnapshot 接口:

Review Comment:
   为了实现这一功能,new source 的 SplitEnumerator 需要实现 SupportsBatchSnapshot 接口:
   
   -> New source 的 SplitEnumerator 需要能够支持批处理场景下的 
snapshotState(checkpointId=-1),并且实现 [SupportsBatchSnapshot]({{< ref "xxx" 
>}}#yyy) 接口,从而能够在重启后恢复到出错前的状态。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1618480332


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+InternalGenerator gen =
+new InternalGenerator(
+numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+List elements = new ArrayList<>();
+gen.forEachRemaining(elements::add);
+return new Generator(elements);
+}

Review Comment:
   You're right - I got a bit carried away. We shouldn't optimize it any 
further. But let's keep the changes as they are now since you applied it.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35399][doc] Add documents for batch job recovery from jobMaster failover. [flink]

2024-05-29 Thread via GitHub


zhuzhurk commented on code in PR #24855:
URL: https://github.com/apache/flink/pull/24855#discussion_r1618437083


##
docs/content/docs/ops/batch/recovery_from_job_master_failure.md:
##
@@ -0,0 +1,110 @@
+---
+title: "Recovery job progress from job master failures"
+weight: 4
+type: docs
+aliases:
+
+- /docs/ops/batch/recovery_from_job_master_failure.html
+- /docs/ops/batch/recovery_from_job_master_failure
+
+---
+
+
+# Batch jobs progress recovery from job master failures
+
+## Background
+
+Previously, if the JobMaster fails and is terminated, one of the following two 
situations will occur:
+
+- If high availability (HA) is disabled, the job will fail.
+- If HA is enabled, a JobMaster failover will happen and the job will be 
restarted. Streaming jobs can resume from the 
+  latest successful checkpoints. Batch jobs, however, do not have checkpoints 
and have to start over from the beginning, 
+  losing all previously made progress. This represents a significant 
regression for long-running batch jobs.
+
+To address this issue, a batch job recovery mechanism is introduced to enable 
batch jobs to recover as much progress as 
+possible after a JobMaster failover, avoiding the need to rerun tasks that 
have already been finished.
+
+To implement this feature, a JobEventStore component is introduced to record 
state change events of the JobMaster 
+(such as ExecutionGraph, OperatorCoordinator, etc.) to an external filesystem. 
During the crash and subsequent restart 
+of the JobMaster, TaskManagers will retain the intermediate result data 
produced by the job and attempt to reconnect 
+continuously. Once the JobMaster restarts, it will re-establish connections 
with TaskManagers and recover the job state 
+based on the retained intermediate results and the events previously recorded 
in the JobEventStore, thereby resuming 
+the job's execution progress.
+
+## Usage
+
+This section explains how to enable recovery of batch jobs from JobMaster 
failures, how to tune it, and how to develop 
+sources to work with batch jobs progress recovery.
+
+### How to enable batch jobs progress recovery from job master failures
+
+- Enable cluster high availability:
+
+  To enable the recovery of batch jobs from JobMaster failures, it is 
essential to first ensure that cluster
+  high availability (HA) is enabled. Flink supports HA services backed by 
ZooKeeper or Kubernetes.
+  More details of the configuration can be found in the [High 
Availability]({{< ref "docs/deployment/ha/overview#high-availability" >}}) page.
+- Configure [execution.batch.job-recovery.enabled]({{< ref 
"docs/deployment/config" >}}#execution-batch-job-recovery-enabled): true
+
+Note that currently only [Adaptive Batch Scheduler]({{< ref 
"docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) 
+supports this feature. And Flink batch jobs will use this scheduler by default 
unless another scheduler is explicitly configured.
+
+### Optimization
+
+To enable batch jobs to recover as much progress as possible after a JobMaster 
failover, and avoid rerunning tasks 
+that have already been finished, you can configure the following options for 
optimization:
+
+- [execution.batch.job-recovery.snapshot.min-pause]({{< ref 
"docs/deployment/config" >}}#execution-batch-job-recovery-snapshot-min-pause):
+  This setting determines the minimum pause time allowed between snapshots for 
the OperatorCoordinator and ShuffleMaster.
+  This parameter could be adjusted based on the expected I/O load of your 
cluster and the tolerable amount of state regression. 
+  Reduce this interval if smaller state regressions are preferred and a higher 
I/O load is acceptable.
+- [execution.batch.job-recovery.previous-worker.recovery.timeout]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-job-recovery-previous-worker-recovery-timeout):
+  This setting determines the timeout duration allowed for Shuffle workers to 
reconnect. During the recovery process, Flink 
+  requests the retained intermediate result data information from the Shuffle 
Master. If the timeout is reached, 
+  Flink will use all the acquired intermediate result data to recover the 
state.
+- [job-event.store.write-buffer.flush-interval]({{< ref 
"docs/deployment/config" >}}#job-event-store-write-buffer-flush-interval):
+  This setting determines the flush interval for the JobEventStore's write 
buffers.
+- [job-event.store.write-buffer.size]({{< ref "docs/deployment/config" 
>}}#job-event-store-write-buffer-size): This 
+  setting determines the write buffer size in the JobEventStore. When the 
buffer is full, its contents are flushed to the external
+  filesystem.
+
+### Enable batch jobs progress recovery for sources
+
+Currently, only the new source (FLIP-27) supports progress recovery for batch 
jobs. To enable this feature, the SplitEnumerator of a new source (FLIP-27) 
need implement the SupportsBatchSnapshot interface:
+
+
+public interface SupportsBatchSnapshot {}

Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-29 Thread via GitHub


Zakelly commented on PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2136859786

   And I find the benchmark runs extremely slow with the profiler on. The ETA 
is up to 8 days for full set of our daily run. So I guess this is best only 
used for single tests that are triggered manually, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1618451567


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+InternalGenerator gen =
+new InternalGenerator(
+numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+List elements = new ArrayList<>();
+gen.forEachRemaining(elements::add);
+return new Generator(elements);
+}

Review Comment:
   I applied your suggestion as I like it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-29 Thread via GitHub


Zakelly commented on code in PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618445050


##
benchmark.sh:
##
@@ -0,0 +1,54 @@
+#!/usr/bin/env bash

Review Comment:
   `chown +x` to make this executable?



##
pom.xml:
##
@@ -67,6 +67,7 @@ under the License.

org.apache.flink.benchmark.full.*,org.apache.flink.state.benchmark.*,org.apache.flink.scheduler.benchmark.*
.*
java
+

Review Comment:
   How about adding a property for path for profiling result? And I suggest a 
relative path under current path for default value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1618443253


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+InternalGenerator gen =
+new InternalGenerator(
+numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+List elements = new ArrayList<>();
+gen.forEachRemaining(elements::add);
+return new Generator(elements);
+}

Review Comment:
   Did not want to edit too much the original logic of the generator itself  
   
   But it makes sense  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [1.19][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]

2024-05-29 Thread via GitHub


flinkbot commented on PR #24864:
URL: https://github.com/apache/flink/pull/24864#issuecomment-2136816444

   
   ## CI report:
   
   * ab8e3b5d59abf674a8cb72ea87df211cb7f0787a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [1.18][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]

2024-05-29 Thread via GitHub


flinkbot commented on PR #24865:
URL: https://github.com/apache/flink/pull/24865#issuecomment-2136816894

   
   ## CI report:
   
   * cb9b44b4f7fa394884d2b4821ca5ed3cd93f0b70 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]

2024-05-29 Thread via GitHub


pnowojski merged PR #24857:
URL: https://github.com/apache/flink/pull/24857


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-29 Thread via GitHub


pnowojski closed pull request #24784: [FLINK-35351][checkpoint] Fix fail during 
restore from unaligned chec…
URL: https://github.com/apache/flink/pull/24784


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35461][config] Improve Runtime Configuration for Flink 2.0 [flink]

2024-05-29 Thread via GitHub


Sxnan commented on code in PR #24853:
URL: https://github.com/apache/flink/pull/24853#discussion_r1618380520


##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -428,6 +455,21 @@ public enum CompressionCodec {
 + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
 + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+/** The timeout for requesting buffers for each channel. */
+@Documentation.ExcludeFromDocumentation(
+"This option is purely implementation related, and may be removed 
as the implementation changes.")
+public static final ConfigOption NETWORK_BUFFERS_REQUEST_TIMEOUT 
=

Review Comment:
   Removing the old `ConfigOption` right away will cause the code that directly 
uses the `ConfigOption` to fail. It is better to mark it deprecated in 1.20 and 
remove it in 2.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35430][cdc-connector][kafka] Passed the zoneId to JsonSerializationSchema. [flink-cdc]

2024-05-29 Thread via GitHub


joyCurry30 commented on PR #3359:
URL: https://github.com/apache/flink-cdc/pull/3359#issuecomment-2136672889

   > Hi @joyCurry30, could you please create another PR to backport this to 
`release-3.1`? We're preparing for the next patch release, and it would be nice 
to include this bugfix.
   
   https://github.com/apache/flink-cdc/pull/3377
   Hi, cc plz. @yuxiqian 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][FLINK-35430][cdc-connector][kafka] Passed the zoneId to JsonSerializationSchema. [flink-cdc]

2024-05-29 Thread via GitHub


joyCurry30 opened a new pull request, #3377:
URL: https://github.com/apache/flink-cdc/pull/3377

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35325][cdc-connector][paimon]Support for specifying column order. [flink-cdc]

2024-05-29 Thread via GitHub


joyCurry30 commented on PR #3323:
URL: https://github.com/apache/flink-cdc/pull/3323#issuecomment-2136669682

   > Hi @joyCurry30, could you please backport it to `release-3.1` branch too?
   
   https://github.com/apache/flink-cdc/pull/3376
   cc plz. @yuxiqian 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. [flink-cdc]

2024-05-29 Thread via GitHub


joyCurry30 opened a new pull request, #3376:
URL: https://github.com/apache/flink-cdc/pull/3376

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian opened a new pull request, #3375:
URL: https://github.com/apache/flink-cdc/pull/3375

   Cherry-picked from #3348.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-29 Thread via GitHub


showuon commented on code in PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618289766


##
README.md:
##
@@ -64,6 +64,20 @@ java -jar target/benchmarks.jar "" -lp
 java -jar target/benchmarks.jar "org.apache.flink.state.benchmark.*" -p 
"backendType=ROCKSDB" 
 ```
 
+## Generating Flame graphs
+
+JMH has support for enabling profilers when running benchmarks, in particular 
[async profiler](https://github.com/async-profiler/async-profiler) which can 
generate flame graphs of the call stack. However, async profiler requires 
linking with native code which needs downloaded manually which means its 
location is user and architecture specific. To enable async profiler support 
run the benchmarks as follows:
+
+with maven
+```
+ java -jar target/benchmarks.jar -rf csv "" 
-DasyncProfilerLib=

Review Comment:
   nit: additional space at the beginning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2136650327

   Hi @loserwang1024, could you please backport this patch to `release-3.1` 
branch so that it could be released with CDC 3.1.1?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35325][cdc-connector][paimon]Support for specifying column order. [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian commented on PR #3323:
URL: https://github.com/apache/flink-cdc/pull/3323#issuecomment-2136645319

   Hi @joyCurry30, could you please backport it to `release-3.1` branch too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35430][cdc-connector][kafka] Passed the zoneId to JsonSerializationSchema. [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian commented on PR #3359:
URL: https://github.com/apache/flink-cdc/pull/3359#issuecomment-2136644955

   Hi @joyCurry30, could you please create another PR to backport this to 
`release-3.1`? We're preparing for the next patch release, and it would be nice 
to include this bugfix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian commented on PR #3332:
URL: https://github.com/apache/flink-cdc/pull/3332#issuecomment-2136642062

   Hi @czy006, could you please open another PR to backport this patch to 
`release-3.1` branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][FLINK-35295][mysql] Improve jdbc connection pool initialization failure message [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian opened a new pull request, #3374:
URL: https://github.com/apache/flink-cdc/pull/3374

   Cherry-picked from #3293.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][FLINK-35294][mysql] Use source config to check if the filter should be applied in timestamp starting mode [flink-cdc]

2024-05-29 Thread via GitHub


yuxiqian opened a new pull request, #3373:
URL: https://github.com/apache/flink-cdc/pull/3373

   Cherry-picked from #3291.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]

2024-05-28 Thread via GitHub


ldadima commented on PR #24857:
URL: https://github.com/apache/flink/pull/24857#issuecomment-2136570818

   Thanks for changes. LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][FLINK-35464] Fixes operator state backwards compatibility from CDC 3.0.x [flink-cdc]

2024-05-28 Thread via GitHub


yuxiqian opened a new pull request, #3370:
URL: https://github.com/apache/flink-cdc/pull/3370

   This closes [FLINK-35441](https://issues.apache.org/jira/browse/FLINK-35441) 
and [FLINK-35464](https://issues.apache.org/jira/browse/FLINK-35464).
   
   See #3369 for more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc source. [flink-connector-jdbc]

2024-05-28 Thread via GitHub


RocMarshal opened a new pull request, #122:
URL: https://github.com/apache/flink-connector-jdbc/pull/122

   [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc 
source.
   
   - Only for English edition pages.
   - [ ] The translation is panned to do in a new jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-28 Thread via GitHub


ldadima commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2136510387

   > Thanks for the update @ldadima . I have fixed up a couple of things in 
#24857 so let me close this PR in favour of that one. Could you take a look at 
my version?
   
   I checked the MR. I agree with changes about ITCase. In my case I forget 
about uid, that's why map vertex doesn't work for me in rescale case. Thanks 
for changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-28 Thread via GitHub


SamBarker commented on PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2136485697

   @Zakelly any news on the jenkins env? I'm largely hoping its a no-op there 
right now.
   
   @pnowojski readme and executable bit set. Is there anything else your 
looking for?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Enable async profiler [flink-benchmarks]

2024-05-28 Thread via GitHub


SamBarker commented on code in PR #90:
URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618157436


##
benchmark.sh:
##
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+
+JAVA_ARGS=()
+JMH_ARGS=()
+BINARY="java"
+BENCHMARK_PATTERN=
+
+while getopts ":j:c:b:e:p:a:m:h" opt; do
+  case $opt in
+j) JAVA_ARGS+=("${OPTARG}")
+;;
+c) CLASSPATH_ARG="${OPTARG}"
+;;
+b) BINARY="${OPTARG}"
+;;
+p) PROFILER_ARG="${OPTARG:+-prof ${OPTARG}}"
+# conditional prefixing inspired by 
https://stackoverflow.com/a/40771884/1389220
+;;
+a) JMH_ARGS+=("${OPTARG}")
+;;
+e) BENCHMARK_EXCLUDES="${OPTARG:+-e ${OPTARG}}"
+;;
+m) BENCHMARK_PATTERN="${OPTARG}"
+  echo "parsing -m"
+;;
+h)
+  1>&2 cat << EOF
+usage: TODO
+EOF
+  exit 1
+;;
+\?) echo "Invalid option -$opt ${OPTARG}" >&2
+exit 1
+;;
+  esac
+done
+shift "$(($OPTIND -1))"

Review Comment:
   I had a go at doing it in python 
https://github.com/SamBarker/flink-benchmarks/blob/generate-flamegraphs-py/run-benchmarks.py
 life got quite funky trying to pass `--add-opens` to something that actually 
understood long arguments. 
   
   I can merge that into this PR if preferred but I'm not sure it actually 
improves things here. 
   
   The other option might be to look at 
[ap-loader](https://github.com/jvm-profiling-tools/ap-loader) which would allow 
us to ensure that the native libraries are available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35461][config] Improve Runtime Configuration for Flink 2.0 [flink]

2024-05-28 Thread via GitHub


xintongsong commented on code in PR #24853:
URL: https://github.com/apache/flink/pull/24853#discussion_r1618145159


##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -316,7 +316,11 @@ public enum CompressionCodec {
 /**
  * Parallelism threshold to switch between sort-based blocking shuffle and 
hash-based blocking
  * shuffle.
+ *
+ * @deprecated The hash-based blocking shuffle is deprecated in 1.20 and 
will be totally removed
+ * in 2.0.
  */
+@Deprecated
 @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)

Review Comment:
   This can also be removed, because deprecated config options won't appear in 
docs.



##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -428,6 +455,21 @@ public enum CompressionCodec {
 + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
 + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+/** The timeout for requesting buffers for each channel. */
+@Documentation.ExcludeFromDocumentation(
+"This option is purely implementation related, and may be removed 
as the implementation changes.")
+public static final ConfigOption NETWORK_BUFFERS_REQUEST_TIMEOUT 
=

Review Comment:
   The old `ConfigOption` can be removed as it is already covered by 
`withDeprecatedKeys` of the new option.



##
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##
@@ -316,7 +316,11 @@ public enum CompressionCodec {
 /**
  * Parallelism threshold to switch between sort-based blocking shuffle and 
hash-based blocking
  * shuffle.
+ *
+ * @deprecated The hash-based blocking shuffle is deprecated in 1.20 and 
will be totally removed
+ * in 2.0.
  */
+@Deprecated
 @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)

Review Comment:
   Same for other deprecated options with documentation related annotations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs-zh] maven should require v3.8.6, not higher [flink]

2024-05-28 Thread via GitHub


flinkbot commented on PR #24863:
URL: https://github.com/apache/flink/pull/24863#issuecomment-2136449270

   
   ## CI report:
   
   * 723de4273c06f3974e3a21808e6c5cf8ed6f7b84 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][docs-zh] maven should require v3.8.6 [flink]

2024-05-28 Thread via GitHub


showuon opened a new pull request, #24863:
URL: https://github.com/apache/flink/pull/24863

   
   
   ## What is the purpose of the change
   
   We require maven v3.8.6 (not higher) to build flink, otherwise, it'll get 
error:
   `Detected Maven Version: 3.9.6 is not in the allowed range [3.8.6,3.8.6].`
   
   ## Brief change log
   
   Update the wrong requirement in maven.md in Chinese.
   
   
   ## Verifying this change
   
   Only doc is changed.
   
   ## Does this pull request potentially affect one of the following parts:
   
   Only doc is changed.
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-28 Thread via GitHub


robobario commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1618053167


##
flink-formats/flink-csv/src/test/resources/log4j2-test.properties:
##
@@ -0,0 +1,28 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO

Review Comment:
   looking at other modules and the comment above I think this should be 
checked in set to `OFF`



##
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.TestDataGenerators;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.io.FileUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CsvBulkWriterIT {
+
+@TempDir File outDir;
+
+@Test
+public void testNoDataIsWrittenBeforeFlush() throws Exception {
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+env.enableCheckpointing(100);
+env.setRestartStrategy(RestartStrategies.noRestart());
+
+// Workaround serialization limitations
+File outDirRef = new File(outDir.getAbsolutePath());
+
+FileSink sink =
+FileSink.forBulkFormat(
+new 
org.apache.flink.core.fs.Path(outDir.getAbsolutePath()),
+out -> {
+FSDataOutputStreamWrapper 
outputStreamWrapper =
+new FSDataOutputStreamWrapper(out);
+return new CsvBulkWriterWrapper<>(
+CsvBulkWriter.forPojo(Pojo.class, 
outputStreamWrapper),
+outputStreamWrapper,
+outDirRef);
+})
+.build();
+
+List integers = Arrays.asList(new Pojo(1), new Pojo(2));
+DataGeneratorSource generatorSource =
+TestDataGenerators.fromDataWithSnapshotsLatch(
+integers, TypeInformation.of(Pojo.class));
+env.fromSource(generatorSource, 

Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]

2024-05-28 Thread via GitHub


lvyanquan commented on code in PR #3348:
URL: https://github.com/apache/flink-cdc/pull/3348#discussion_r1618087973


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java:
##
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
+import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
+import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
+import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer;
+import 
org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME;
+
+/** IT tests for {@link StarRocksMetadataApplier}. */
+public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase {
+private static final StreamExecutionEnvironment env =
+StreamExecutionEnvironment.getExecutionEnvironment();
+
+@BeforeClass
+public static void before() {
+env.setParallelism(DEFAULT_PARALLELISM);
+env.enableCheckpointing(3000);
+env.setRestartStrategy(RestartStrategies.noRestart());
+}
+
+@Before
+public void initializeDatabase() {
+executeSql(
+String.format(
+"CREATE DATABASE IF NOT EXISTS `%s`;",
+StarRocksContainer.STARROCKS_DATABASE_NAME));
+LOG.info("Database {} created.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+}
+
+@After
+public void destroyDatabase() {
+executeSql(String.format("DROP DATABASE %s;", 
StarRocksContainer.STARROCKS_DATABASE_NAME));
+LOG.info("Database {} destroyed.", 
StarRocksContainer.STARROCKS_DATABASE_NAME);
+}
+
+private List generateAddColumnEvents(TableId tableId) {
+Schema schema =
+Schema.newBuilder()
+.column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+.column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
+.column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+  

Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


HuangZhenQiu commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1618086605


##
flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.execution;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobStatusChangedEvent;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for job status changed listener. */
+public class JobStatusChangedListenerITCase {
+private static List statusChangedEvents = new 
ArrayList<>();
+
+@Test
+void testJobStatusChanged() throws Exception {
+Configuration configuration = new Configuration();
+configuration.set(
+JOB_STATUS_CHANGED_LISTENERS,
+
Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName()));
+try (StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration)) {
+List sourceValues = Arrays.asList("a", "b", "c");
+List resultValues = new ArrayList<>();
+try (CloseableIterator iterator =
+env.fromCollection(sourceValues).executeAndCollect()) {
+while (iterator.hasNext()) {
+resultValues.add(iterator.next());
+}
+}
+
assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new 
String[0]));
+}
+assertThat(statusChangedEvents.size()).isEqualTo(3);
+assertThat(statusChangedEvents.get(0).jobId())
+.isEqualTo(statusChangedEvents.get(1).jobId());
+assertThat(statusChangedEvents.get(0).jobName())
+.isEqualTo(statusChangedEvents.get(1).jobName());
+
+assertThat(statusChangedEvents.get(1).jobId())
+.isEqualTo(statusChangedEvents.get(2).jobId());
+assertThat(statusChangedEvents.get(1).jobName())
+.isEqualTo(statusChangedEvents.get(2).jobName());
+
+statusChangedEvents.forEach(
+event -> {
+if (event instanceof DefaultJobExecutionStatusEvent) {
+JobExecutionStatusEvent status = 
(JobExecutionStatusEvent) event;
+assertThat(
+(status.oldStatus() == 
JobStatus.CREATED
+&& status.newStatus() 
== JobStatus.RUNNING)

Review Comment:
   Good suggestion. Added test cases accordingly. It actually help me to find 
out that the job status changed listener should be added for 
MiniClusterExecutor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]

2024-05-28 Thread via GitHub


lvyanquan commented on code in PR #3348:
URL: https://github.com/apache/flink-cdc/pull/3348#discussion_r1618083749


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
+import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
+import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
+
+/** IT tests for {@link DorisMetadataApplier}. */
+@RunWith(Parameterized.class)
+public class DorisMetadataApplierITCase extends DorisSinkTestBase {
+private static final StreamExecutionEnvironment env =
+StreamExecutionEnvironment.getExecutionEnvironment();
+
+private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
+
+private final boolean batchMode;
+
+public DorisMetadataApplierITCase(boolean batchMode) {
+this.batchMode = batchMode;
+}
+
+@Parameters(name = "batchMode: {0}")
+public static Iterable data() {
+return Arrays.asList(true, false);
+}
+
+@BeforeClass
+public static void before() {
+env.setParallelism(DEFAULT_PARALLELISM);
+env.enableCheckpointing(3000);
+env.setRestartStrategy(RestartStrategies.noRestart());
+}
+
+@Before
+public void initializeDatabase() {
+createDatabase(DorisContainer.DORIS_DATABASE_NAME);
+
+// waiting for table to be created
+DORIS_CONTAINER.waitForLog(
+String.format(".*createDb dbName = %s,.*\\s", 
DorisContainer.DORIS_DATABASE_NAME),
+1,
+DATABASE_OPERATION_TIMEOUT_SECONDS);
+

Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]

2024-05-28 Thread via GitHub


lvyanquan commented on code in PR #3348:
URL: https://github.com/apache/flink-cdc/pull/3348#discussion_r1618083749


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
+import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
+import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
+import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
+
+/** IT tests for {@link DorisMetadataApplier}. */
+@RunWith(Parameterized.class)
+public class DorisMetadataApplierITCase extends DorisSinkTestBase {
+private static final StreamExecutionEnvironment env =
+StreamExecutionEnvironment.getExecutionEnvironment();
+
+private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5;
+
+private final boolean batchMode;
+
+public DorisMetadataApplierITCase(boolean batchMode) {
+this.batchMode = batchMode;
+}
+
+@Parameters(name = "batchMode: {0}")
+public static Iterable data() {
+return Arrays.asList(true, false);
+}
+
+@BeforeClass
+public static void before() {
+env.setParallelism(DEFAULT_PARALLELISM);
+env.enableCheckpointing(3000);
+env.setRestartStrategy(RestartStrategies.noRestart());
+}
+
+@Before
+public void initializeDatabase() {
+createDatabase(DorisContainer.DORIS_DATABASE_NAME);
+
+// waiting for table to be created
+DORIS_CONTAINER.waitForLog(
+String.format(".*createDb dbName = %s,.*\\s", 
DorisContainer.DORIS_DATABASE_NAME),
+1,
+DATABASE_OPERATION_TIMEOUT_SECONDS);
+

Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-05-28 Thread via GitHub


X-czh commented on PR #24845:
URL: https://github.com/apache/flink/pull/24845#issuecomment-2136325346

   @reswqa Hi, could you help take a review when you have time? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35454] Allow connector classes to depend on internal Flink util classes [flink]

2024-05-28 Thread via GitHub


JingGe merged PR #24843:
URL: https://github.com/apache/flink/pull/24843


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.17][FLINK-34379][table] Fix adding catalogtable logic [flink]

2024-05-28 Thread via GitHub


flinkbot commented on PR #24862:
URL: https://github.com/apache/flink/pull/24862#issuecomment-2136180667

   
   ## CI report:
   
   * 3e9ada3635c286697f25410f3e7a4ceb81f34220 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-34379][table] Fix adding catalogtable logic [flink]

2024-05-28 Thread via GitHub


flinkbot commented on PR #24861:
URL: https://github.com/apache/flink/pull/24861#issuecomment-2136175334

   
   ## CI report:
   
   * 6dae529f1f7578c3666315aa1b0ea04e3b7b97ff UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.17][FLINK-34379][table] Fix adding catalogtable logic [flink]

2024-05-28 Thread via GitHub


jeyhunkarimov opened a new pull request, #24862:
URL: https://github.com/apache/flink/pull/24862

   This is BP to 1.17 PR for 
https://github.com/apache/flink/commit/87b7193846090897b2feabf716ee5378bcd7585b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.18][FLINK-34379][table] Fix adding catalogtable logic [flink]

2024-05-28 Thread via GitHub


jeyhunkarimov opened a new pull request, #24861:
URL: https://github.com/apache/flink/pull/24861

   This is BP to 1.18 PR for 
https://github.com/apache/flink/commit/87b7193846090897b2feabf716ee5378bcd7585b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34379][table] Fix adding catalogtable logic [flink]

2024-05-28 Thread via GitHub


flinkbot commented on PR #24860:
URL: https://github.com/apache/flink/pull/24860#issuecomment-2136170701

   
   ## CI report:
   
   * b8993db5e7956bd8e0c9466df3b378a18a126148 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34379][table] Fix adding catalogtable logic [flink]

2024-05-28 Thread via GitHub


jeyhunkarimov opened a new pull request, #24860:
URL: https://github.com/apache/flink/pull/24860

   This is BP to 1.19 PR for 87b7193846090897b2feabf716ee5378bcd7585b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1617923919


##
flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.execution;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobStatusChangedEvent;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for job status changed listener. */
+public class JobStatusChangedListenerITCase {
+private static List statusChangedEvents = new 
ArrayList<>();
+
+@Test
+void testJobStatusChanged() throws Exception {
+Configuration configuration = new Configuration();
+configuration.set(
+JOB_STATUS_CHANGED_LISTENERS,
+
Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName()));
+try (StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration)) {
+List sourceValues = Arrays.asList("a", "b", "c");
+List resultValues = new ArrayList<>();
+try (CloseableIterator iterator =
+env.fromCollection(sourceValues).executeAndCollect()) {
+while (iterator.hasNext()) {
+resultValues.add(iterator.next());
+}
+}
+
assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new 
String[0]));
+}
+assertThat(statusChangedEvents.size()).isEqualTo(3);
+assertThat(statusChangedEvents.get(0).jobId())
+.isEqualTo(statusChangedEvents.get(1).jobId());
+assertThat(statusChangedEvents.get(0).jobName())
+.isEqualTo(statusChangedEvents.get(1).jobName());
+
+assertThat(statusChangedEvents.get(1).jobId())
+.isEqualTo(statusChangedEvents.get(2).jobId());
+assertThat(statusChangedEvents.get(1).jobName())
+.isEqualTo(statusChangedEvents.get(2).jobName());
+
+statusChangedEvents.forEach(
+event -> {
+if (event instanceof DefaultJobExecutionStatusEvent) {
+JobExecutionStatusEvent status = 
(JobExecutionStatusEvent) event;
+assertThat(
+(status.oldStatus() == 
JobStatus.CREATED
+&& status.newStatus() 
== JobStatus.RUNNING)

Review Comment:
   can we add a test that check for a :`FAILING` / `FAILED` and for 
`CANCELLING`:`CANCELED`:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1617917890


##
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##
@@ -153,7 +173,18 @@ private CompletableFuture 
submitAndGetJobClientFuture(
 return jobId;
 }))
 .thenApplyAsync(
-jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {
+PipelineExecutorUtils.notifyJobStatusListeners(
+pipeline, jobGraph, 
jobStatusChangedListeners);
+} else {
+LOG.error(
+"Fail to submit job graph to 
application cluster",

Review Comment:
   nit: Fail => Failed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


HuangZhenQiu commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1606207458


##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
 ClusterID, ClientFactory extends 
ClusterClientFactory>
 implements CacheSupportedPipelineExecutor {
+private final ExecutorService executorService =
+Executors.newFixedThreadPool(
+4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
 
 private final ClientFactory clusterClientFactory;
+private final Configuration configuration;
+private final List jobStatusChangedListeners;
 
-public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+public AbstractSessionClusterExecutor(
+@Nonnull final ClientFactory clusterClientFactory, Configuration 
configuration) {
 this.clusterClientFactory = checkNotNull(clusterClientFactory);
+this.configuration = configuration;
+this.jobStatusChangedListeners =
+JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+this.getClass().getClassLoader(), configuration, 
executorService);

Review Comment:
   We basically need to load the job status changed listeners in flink libs or 
plugins here. Yes, thread context class loader makes more sense. 



##
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##
@@ -153,7 +173,14 @@ private CompletableFuture 
submitAndGetJobClientFuture(
 return jobId;
 }))
 .thenApplyAsync(
-jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {

Review Comment:
   Discussed with @davidradl offline. The throwable is not able to rethrow in 
whenCompleteAsync. Thus, log the exception rather than swallow the throwable.
   
https://stackoverflow.com/questions/71668871/completablefuture-whencompleteasync-does-not-let-me-re-throw-an-exception



##
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##
@@ -355,7 +368,7 @@ public CompletableFuture 
requestJobResult(@Nonnull JobID jobId) {
 }
 
 @Override
-public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) {

Review Comment:
   With Gyula's suggestion, we don't need to change the API now.



##
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##
@@ -454,6 +467,24 @@ public CompletableFuture submitJob(@Nonnull 
JobGraph jobGraph) {
 receiver,
 error);
 } else {
+RuntimeExecutionMode executionMode 
=
+
jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+if 
(jobStatusChangedListeners.size() > 0) {
+
jobStatusChangedListeners.forEach(
+listener ->
+
listener.onEvent(
+new 
DefaultJobCreatedEvent(
+   
 jobGraph.getJobID(),
+   
 jobGraph.getName(),
+   
 pipeline == null
+   
 ? null
+   
 : ((StreamGraph)
+   
 pipeline)
+   
 .getLineageGraph(),
+   
 executionMode)));
+}
+
 LOG.info(
 "Successfully submitted 
job '{}' ({}) to '{}'.",

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-28 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2136055216

   Thanks for the feedback @vahmed-hamdy. I replied to all the feedback except 
the documentation one which is still in progress. Is there a package where I 
suppose to add the documentation for the same?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-05-28 Thread via GitHub


morazow commented on code in PR #24426:
URL: https://github.com/apache/flink/pull/24426#discussion_r1617850492


##
.github/workflows/nightly.yml:
##
@@ -94,3 +94,46 @@ jobs:
   s3_bucket: ${{ secrets.IT_CASE_S3_BUCKET }}
   s3_access_key: ${{ secrets.IT_CASE_S3_ACCESS_KEY }}
   s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }}
+
+  build_python_wheels:
+name: "Build Python Wheels on ${{ matrix.os_name }}"
+runs-on: ${{ matrix.os }}
+strategy:
+  fail-fast: false
+  matrix:
+include:
+  - os: ubuntu-latest
+os_name: linux
+  - os: macos-latest
+os_name: macos
+steps:
+  - name: "Checkout the repository"
+uses: actions/checkout@v4
+with:
+  fetch-depth: 0
+  persist-credentials: false
+  - name: "Stringify workflow name"
+uses: "./.github/actions/stringify"
+id: stringify_workflow
+with:
+  value: ${{ github.workflow }}
+  - name: "Build python wheels for ${{ matrix.os_name }}"
+uses: pypa/cibuildwheel@v2.16.5

Review Comment:
   Great, updated ✔️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26808][1.19] Limit FileUploadHandler to multipart routes [flink]

2024-05-28 Thread via GitHub


flinkbot commented on PR #24859:
URL: https://github.com/apache/flink/pull/24859#issuecomment-2136017635

   
   ## CI report:
   
   * da45a7843f59b1fb2070b2de55cbd4214e05ae0f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-26808][1.19] Limit FileUploadHandler to multipart routes [flink]

2024-05-28 Thread via GitHub


uce opened a new pull request, #24859:
URL: https://github.com/apache/flink/pull/24859

   See https://github.com/apache/flink/pull/24856 for the PR description.
   
   This is a backport for 1.19. The commits applied cleanly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26808] Limit FileUploadHandler to multipart routes [flink]

2024-05-28 Thread via GitHub


uce commented on PR #24856:
URL: https://github.com/apache/flink/pull/24856#issuecomment-2136005466

   I was too trigger happy and missed to fixup the `review` commit before 
merging.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   5   6   7   8   9   10   >