Re: [PR] Enable async profiler [flink-benchmarks]
SamBarker commented on PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2138478974 > It compiles & install flink before run benchmarks via /mnt/jenkins/tools/hudson.tasks.Maven_MavenInstallation/M3/bin/mvn -Dflink.version=1.20-SNAPSHOT clean install exec:exec -Penable-async-profiler -DskipTests -Drat.skip=true '-Dbenchmarks=org.apache.flink.state.benchmark.*' '-DbenchmarkExcludes=org.apache.flink.benchmark.full.*' -DexecutableJava=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-4.0.2.al8.x86_64/bin/java -DasyncProfilerLib=/opt/async-profiler-3.0-linux-x64/lib/libasyncProfiler.so -DprofResultPath=./profile-result >It would be great if you could provide some options to speed this up. It looks to me like it was handling the excludes and includes correctly which is what I was concerned about. I can't reproduce a slower runtime with the profiler on With profiler ``` mvn -Dflink.version=1.20-SNAPSHOT -DexecutableJava=${JAVA_HOME}/bin/java -DasyncProfilerLib=/home/sam/src/async-profiler/async-profiler-3.0-linux-x64/lib/libasyncProfiler.so -Dbenchmarks="SerializationFrameworkMiniBenchmarks.serializerHeavyString" exec:exec --offline ... snip ... # JMH version: 1.37 # VM version: JDK 11.0.23, OpenJDK 64-Bit Server VM, 11.0.23+9 # VM invoker: /home/sam/.sdkman/candidates/java/11.0.23-tem/bin/java # VM options: -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl # Blackhole mode: full + dont-inline hint (auto-detected, use -Djmh.blackhole.autoDetect=false to disable) # Warmup: 10 iterations, 10 s each # Measurement: 10 iterations, 10 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString # Run progress: 0.00% complete, ETA 00:10:00 ... snip ... [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 11:17 min [INFO] Finished at: 2024-05-30T12:10:42+12:00 [INFO] ``` without profiler ``` mvn -Dflink.version=1.20-SNAPSHOT -DexecutableJava=${JAVA_HOME}/bin/java -Dbenchmarks="SerializationFrameworkMiniBenchmarks.serializerHeavyString" exec:exec --offline ... snip ... # JMH version: 1.37 # VM version: JDK 11.0.23, OpenJDK 64-Bit Server VM, 11.0.23+9 # VM invoker: /home/sam/.sdkman/candidates/java/11.0.23-tem/bin/java # VM options: -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl # Blackhole mode: full + dont-inline hint (auto-detected, use -Djmh.blackhole.autoDetect=false to disable) # Warmup: 10 iterations, 10 s each # Measurement: 10 iterations, 10 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString # Run progress: 0.00% complete, ETA 00:10:00 ... snip ... [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 11:18 min [INFO] Finished at: 2024-05-30T12:27:06+12:00 [INFO] ``` ~So I'm not really sure what to suggest.~ Wait a minute. This PR changes the version of JMH. Its not the profiler being ON or OFF that affects the runtime its the JMH version. going back to main I get ``` # JMH version: 1.19 # VM version: JDK 11.0.23, VM 11.0.23+9 # VM invoker: /home/sam/.sdkman/candidates/java/11.0.23-tem/bin/java # VM options: -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl # Warmup: 10 iterations, 1 s each # Measurement: 10 iterations, 1 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks.serializerHeavyString # Run progress: 0.00% complete, ETA 00:01:00 ``` This may be related to changes in warmups in [JMH 1.21](https://mail.openjdk.org/pipermail/jmh-dev/2018-May/002753.html) > *) Defaults for warmup/measurement/forks were reconsidered
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
jeyhunkarimov commented on code in PR #24845: URL: https://github.com/apache/flink/pull/24845#discussion_r161942 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializer for {@link java.util.Set}. The serializer relies on an element serializer for the + * serialization of the set's elements. + * + * The serialization format for the set is as follows: four bytes for the length of the set, + * followed by the serialized representation of each element. + * + * @param The type of element in the set. + */ +@Internal +public final class SetSerializer extends TypeSerializer> { + +private static final long serialVersionUID = 1L; + +/** The serializer for the elements of the set. */ +private final TypeSerializer elementSerializer; + +/** + * Creates a set serializer that uses the given serializer to serialize the set's elements. + * + * @param elementSerializer The serializer for the elements of the set + */ +public SetSerializer(TypeSerializer elementSerializer) { +this.elementSerializer = checkNotNull(elementSerializer); +} + +// +// SetSerializer specific properties +// + +/** + * Gets the serializer for the elements of the set. + * + * @return The serializer for the elements of the set + */ +public TypeSerializer getElementSerializer() { +return elementSerializer; +} + +// +// Type Serializer implementation +// + +@Override +public boolean isImmutableType() { +return false; +} + +@Override +public TypeSerializer> duplicate() { +TypeSerializer duplicateElement = elementSerializer.duplicate(); +return duplicateElement == elementSerializer ? this : new SetSerializer<>(duplicateElement); +} + +@Override +public Set createInstance() { +return new HashSet<>(0); +} + +@Override +public Set copy(Set from) { +Set newSet = new HashSet<>(from.size()); +for (T element : from) { +newSet.add(elementSerializer.copy(element)); +} +return newSet; +} + +@Override +public Set copy(Set from, Set reuse) { +return copy(from); +} + +@Override +public int getLength() { +return -1; // var length +} + +@Override +public void serialize(Set set, DataOutputView target) throws IOException { +final int size = set.size(); +target.writeInt(size); +for (T element : set) { +elementSerializer.serialize(element, target); +} +} + +@Override +public Set deserialize(DataInputView source) throws IOException { +final int size = source.readInt(); +final Set set = new HashSet<>(size); Review Comment: In order to ensure the ordering guarantee, do you think using `LinkedHashSet` might be a good idea? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34559] Limit Global & Local Aggregation buffers [flink]
rkhachatryan commented on code in PR #24869: URL: https://github.com/apache/flink/pull/24869#discussion_r1619465130 ## flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java: ## @@ -60,4 +61,56 @@ public class AlgorithmOptions { "Whether to use the LargeRecordHandler when spilling. If a record will not fit into the sorting" + " buffer. The record will be spilled on disk and the sorting will continue with only the key." + " The record itself will be read afterwards when merging."); + +public static final ConfigOption GLOBAL_AGG_BUFFER_SIZE = Review Comment: @twalthr I remember we had some discussions around the right place for these options. Do you have any alternatives in mind? How about org.apache.flink.configuration.ExecutionOptions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34559] Limit Global & Local Aggregation buffers [flink]
flinkbot commented on PR #24869: URL: https://github.com/apache/flink/pull/24869#issuecomment-2138245149 ## CI report: * 3b92e02418068b4ed7def5609dfc8b1fdf5d63e4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34559] Limit Global & Local Aggregation buffers [flink]
rkhachatryan opened a new pull request, #24869: URL: https://github.com/apache/flink/pull/24869 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
sap1ens commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138173997 > Would you recommend keeping it normal, String? Yes! And if someone needs base64 encoding they can encode it in the `SerializationSchema`. > I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me? Hmm, I ran `mvn clean package -DskipTests` in the root of the project and ended up seeing a bunch of spotless violations. Running `mvn spotless:apply` changed a lot of files in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138092946 > @19priyadhingra I've had the opportunity to build and try this connector. Want to share some feedback: > > * Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom `ElementConverter`. > * There are lot of `spotless` violations, you probably want to run `mvn spotless:apply`. @sam1ens, thanks a lot for the feedback. 1) Base64 encoding was the one we were using in our production environment and intention for using it was that base64-encoding guarantees that no invalid characters are present in the message, but now I understand that others might not need it. Would you recommend keeping it normal, String? 2) I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me? ![image](https://github.com/apache/flink-connector-aws/assets/169495197/20262c6a-9492-476c-a5b2-ce7809664e8a) Sorry for the basic question, I am working on this package for the first time :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
sap1ens commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2137994741 @19priyadhingra I've had the opportunity to build and try this connector. Want to share some feedback: - Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom `ElementConverter`. - There are lot of `spotless` violations, you probably want to run `mvn spotless:apply`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]
liuml07 commented on code in PR #105: URL: https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1619214815 ## flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java: ## @@ -21,86 +21,144 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -/** Base Integration tests class. */ -@Testcontainers -public class ElasticsearchSinkBaseITCase { +/** + * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests. + * + * It is extended with the {@link ParameterizedTestExtension} for parameterized testing against + * secure and non-secure Elasticsearch clusters. Tests must be annotated by {@link TestTemplate} in + * order to be parameterized. + * + * The cluster is running via test containers. In order to reuse the singleton containers by all + * inheriting test classes, we manage their lifecycle. The two containers are started only once when + * this class is loaded. At the end of the test suite the Ryuk container that is started by + * Testcontainers core will take care of stopping the singleton container. + */ +@ExtendWith(ParameterizedTestExtension.class) Review Comment: Let me attach a screenshot when I run it locally. Both containers are used by each test case, and also the two containers are used by all test cases of all test classes. https://github.com/apache/flink-connector-elasticsearch/assets/159186/b9886aa9-67b6-43ee-9b8b-941209383268;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]
liuml07 commented on code in PR #105: URL: https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1619214815 ## flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java: ## @@ -21,86 +21,144 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -/** Base Integration tests class. */ -@Testcontainers -public class ElasticsearchSinkBaseITCase { +/** + * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests. + * + * It is extended with the {@link ParameterizedTestExtension} for parameterized testing against + * secure and non-secure Elasticsearch clusters. Tests must be annotated by {@link TestTemplate} in + * order to be parameterized. + * + * The cluster is running via test containers. In order to reuse the singleton containers by all + * inheriting test classes, we manage their lifecycle. The two containers are started only once when + * this class is loaded. At the end of the test suite the Ryuk container that is started by + * Testcontainers core will take care of stopping the singleton container. + */ +@ExtendWith(ParameterizedTestExtension.class) Review Comment: Let me attach a screenshot when I run it locally. Both containers are used by each test case, and also the two containers are used by all test cases of all test classes. https://github.com/apache/flink-connector-elasticsearch/assets/159186/b9886aa9-67b6-43ee-9b8b-941209383268;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]
liuml07 commented on code in PR #105: URL: https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1619214117 ## flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java: ## @@ -21,86 +21,144 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -/** Base Integration tests class. */ -@Testcontainers -public class ElasticsearchSinkBaseITCase { +/** + * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests. + * + * It is extended with the {@link ParameterizedTestExtension} for parameterized testing against + * secure and non-secure Elasticsearch clusters. Tests must be annotated by {@link TestTemplate} in + * order to be parameterized. + * + * The cluster is running via test containers. In order to reuse the singleton containers by all + * inheriting test classes, we manage their lifecycle. The two containers are started only once when + * this class is loaded. At the end of the test suite the Ryuk container that is started by + * Testcontainers core will take care of stopping the singleton container. + */ +@ExtendWith(ParameterizedTestExtension.class) Review Comment: Thanks for taking a look, @reta! The idea of using ParameterizedTestExtension is to make all subclasses run with both secure and non-secure tests by default. So tests do not actually depend on one static container, but instead use both of them. That way, we can gain higher confidence when making specific changes to secure or non-secure case. For example, adding a new NetworkConfig option, be it SSL related or authentication provider. Those secure and non-secure containers are set up automatically and are singletons of the same JVM, which are reused by all test classes. Using `@Container` would make each test class create it's own containers, as we introduce more test classes, this seems sub-optimal. Secondly, having two `@Container` in the base class meaning both containers will be set up (for each class). If the subclass only chooses one to use via `enableSecurity()`, the other one is just slowing down the tests without providing any value. I have been reading the doc of testcontainers and didn't find a solution that works best for base classes via `@Container` managed lifecycle and still efficiently. The suggestion was to manually manage the lifecycle which is simple enough to start it statically, and it will get closed automatically.[1] [1] https://java.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35448] Translate pod templates documentation into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #830: URL: https://github.com/apache/flink-kubernetes-operator/pull/830#discussion_r1619132831 ## docs/content/docs/custom-resource/pod-template.md: ## @@ -93,16 +90,18 @@ spec: ``` {{< hint info >}} -When using the operator with Flink native Kubernetes integration, please refer to [pod template field precedence]( -https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink). +当使用具有 Flink 原生 Kubernetes 集成的 operator 时,请参考 [pod template 字段优先级]( +https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink)。 {{< /hint >}} + ## Array Merging Behaviour -When layering pod templates (defining both a top level and jobmanager specific podtemplate for example) the corresponding yamls are merged together. + + +当分层 pod templates(例如同时定义顶级和任务管理器特定的 pod 模板)时,相应的 yaml 会合并在一起。 -The default behaviour of the pod template mechanism is to merge array arrays by merging the objects in the respective array positions. -This requires that containers in the podTemplates are defined in the same order otherwise the results may be undefined. +默认的 pod 模板机制行为是通过合并相应数组位置的对象来合并数组数组。 Review Comment: array arrays? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] A couple of hotfixes [flink]
pnowojski commented on PR #24868: URL: https://github.com/apache/flink/pull/24868#issuecomment-2137725610 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32088][checkpoint] Space control in file merging [flink]
Zakelly commented on PR #24807: URL: https://github.com/apache/flink/pull/24807#issuecomment-2137702975 Rebased to resolve conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]
Zakelly merged PR #24374: URL: https://github.com/apache/flink/pull/24374 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-35311 Draft pr for initial review of apicurio [flink]
nictownsend commented on code in PR #24715: URL: https://github.com/apache/flink/pull/24715#discussion_r1618968305 ## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioRegistryAvroFormatFactory.java: ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Parser; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_PASSWORD; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_USERID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_ID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_SECRET; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_SCOPE; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_URL; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.PROPERTIES; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_DESCRIPTION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_ID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_NAME; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_VERSION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SCHEMA; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_PASSWORD; +import static
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2137605027 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2137601891 CI test failure is unrelated: FLINK-34513 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35472] Improve tests for Elasticsearch 8 connector [flink-connector-elasticsearch]
reta commented on code in PR #105: URL: https://github.com/apache/flink-connector-elasticsearch/pull/105#discussion_r1618900785 ## flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.java: ## @@ -21,86 +21,144 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -/** Base Integration tests class. */ -@Testcontainers -public class ElasticsearchSinkBaseITCase { +/** + * {@link ElasticsearchSinkBaseITCase} is the base class for integration tests. + * + * It is extended with the {@link ParameterizedTestExtension} for parameterized testing against + * secure and non-secure Elasticsearch clusters. Tests must be annotated by {@link TestTemplate} in + * order to be parameterized. + * + * The cluster is running via test containers. In order to reuse the singleton containers by all + * inheriting test classes, we manage their lifecycle. The two containers are started only once when + * this class is loaded. At the end of the test suite the Ryuk container that is started by + * Testcontainers core will take care of stopping the singleton container. + */ +@ExtendWith(ParameterizedTestExtension.class) Review Comment: Thanks @liuml07 , I think the usage of ```suggestion @ExtendWith(ParameterizedTestExtension.class) ``` is not really bringing value here since the test cases depend on static containers (and parameterized tests do not support that). However, I think you have quite interesting idea to explore, I would suggest to try that: - create 2 containers (as you do now but using the `@Testcontainers` extension): ``` @Container public static final ElasticsearchContainer ES_CONTAINER = createElasticsearchContainer(); @Container public static final ElasticsearchContainer ES_SECURE_CONTAINER = createElasticsearchSecureContainer(); ``` - consequently, use 2 `RestClient` clients, secure and non-secure - introduce abstract method, fe `abstract boolean enableSecurity()` so each test case would have to override and the base test case could use to make the decision which client + customization to use WDYT? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] A couple of hotfixes [flink]
pnowojski commented on code in PR #24868: URL: https://github.com/apache/flink/pull/24868#discussion_r1618893857 ## flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java: ## @@ -638,29 +637,15 @@ public boolean equals(Object o) { } Transformation that = (Transformation) o; - -if (bufferTimeout != that.bufferTimeout) { -return false; -} -if (id != that.id) { -return false; -} -if (parallelism != that.parallelism) { -return false; -} -if (!name.equals(that.name)) { -return false; -} -return outputType != null ? outputType.equals(that.outputType) : that.outputType == null; +return Objects.equals(bufferTimeout, that.bufferTimeout) +&& Objects.equals(id, that.id) +&& Objects.equals(parallelism, that.parallelism) +&& Objects.equals(name, that.name) +&& Objects.equals(outputType, that.outputType); } @Override public int hashCode() { -int result = id; -result = 31 * result + name.hashCode(); -result = 31 * result + (outputType != null ? outputType.hashCode() : 0); -result = 31 * result + parallelism; -result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32)); -return result; +return Objects.hash(id, name, outputType, parallelism, bufferTimeout); Review Comment: Thanks for pointing this out but I don't see how this should matter in our case. I have never seen any `Objects.hash` call anywhere being issue in Flink. Also `Transformation` is not used on any critical path, but during job submission, where there are tons of much heavier operations happening. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] A couple of hotfixes [flink]
snuyanzin commented on code in PR #24868: URL: https://github.com/apache/flink/pull/24868#discussion_r1618885971 ## flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java: ## @@ -638,29 +637,15 @@ public boolean equals(Object o) { } Transformation that = (Transformation) o; - -if (bufferTimeout != that.bufferTimeout) { -return false; -} -if (id != that.id) { -return false; -} -if (parallelism != that.parallelism) { -return false; -} -if (!name.equals(that.name)) { -return false; -} -return outputType != null ? outputType.equals(that.outputType) : that.outputType == null; +return Objects.equals(bufferTimeout, that.bufferTimeout) +&& Objects.equals(id, that.id) +&& Objects.equals(parallelism, that.parallelism) +&& Objects.equals(name, that.name) +&& Objects.equals(outputType, that.outputType); } @Override public int hashCode() { -int result = id; -result = 31 * result + name.hashCode(); -result = 31 * result + (outputType != null ? outputType.hashCode() : 0); -result = 31 * result + parallelism; -result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32)); -return result; +return Objects.hash(id, name, outputType, parallelism, bufferTimeout); Review Comment: Not sure how critical it is here however it seems `Objects#hash` for more than one arg could cost more more details at https://bugs.openjdk.org/browse/JDK-8332840 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] A couple of hotfixes [flink]
flinkbot commented on PR #24868: URL: https://github.com/apache/flink/pull/24868#issuecomment-2137398488 ## CI report: * dd51527c7f8d44e49f39f8dd27c6fac2f34d65da UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] A couple of hotfixes [flink]
pnowojski opened a new pull request, #24868: URL: https://github.com/apache/flink/pull/24868 ## What is the purpose of the change A couple of hotfixes ## Brief change log Please check individual commit messages ## Verifying this change Covered by existing tests and no functional changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? **(not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]
czy006 commented on PR #3332: URL: https://github.com/apache/flink-cdc/pull/3332#issuecomment-2137368517 > Fix [FLINK-35173](https://issues.apache.org/jira/browse/FLINK-35173) MysqlDebeziumTimeConverter miss timezone convert to timestamp. [hotfix-bp-3.1](https://github.com/apache/flink-cdc/pull/3380) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]
czy006 opened a new pull request, #3380: URL: https://github.com/apache/flink-cdc/pull/3380 hotfix Fix [FLINK-35173](https://issues.apache.org/jira/browse/FLINK-35173) MysqlDebeziumTimeConverter miss timezone convert to timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.1][hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]
czy006 closed pull request #3379: [BP-3.1][hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp URL: https://github.com/apache/flink-cdc/pull/3379 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]
czy006 opened a new pull request, #3379: URL: https://github.com/apache/flink-cdc/pull/3379 BP3.1 to Fix [FLINK-35173](https://issues.apache.org/jira/browse/FLINK-35173) MysqlDebeziumTimeConverter miss timezone convert to timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][statebackend/forst] Normalize ForSt option names and working dir [flink]
masteryhx closed pull request #24854: [hotfix][statebackend/forst] Normalize ForSt option names and working dir URL: https://github.com/apache/flink/pull/24854 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
Zakelly commented on PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2137299531 Here's the full log before the pipeline expired. [jenkins.tar.gz](https://github.com/apache/flink-benchmarks/files/15484257/jenkins.tar.gz) It compiles & install flink before run benchmarks via `/mnt/jenkins/tools/hudson.tasks.Maven_MavenInstallation/M3/bin/mvn -Dflink.version=1.20-SNAPSHOT clean install exec:exec -Penable-async-profiler -DskipTests -Drat.skip=true '-Dbenchmarks=org.apache.flink.state.benchmark.*' '-DbenchmarkExcludes=org.apache.flink.benchmark.full.*' -DexecutableJava=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-4.0.2.al8.x86_64/bin/java -DasyncProfilerLib=/opt/async-profiler-3.0-linux-x64/lib/libasyncProfiler.so -DprofResultPath=./profile-result` It would be great if you could provide some options that could help speed this up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-35311 Kafka part required for Flink Apicurio Avro support. Prototype for review [flink-connector-kafka]
nictownsend commented on code in PR #99: URL: https://github.com/apache/flink-connector-kafka/pull/99#discussion_r1618477190 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java: ## @@ -55,6 +61,34 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema producedTypeInfo; private final boolean upsertMode; +private static Method deserializeWithAdditionalPropertiesMethod = null; +protected static final String IS_KEY = "IS_KEY"; + +protected static final String HEADERS = "HEADERS"; + +static { +initializeMethod(); +} + +protected static void initializeMethod() { Review Comment: Probably worth some javadoc here to explain that this is for backwards compatibility (versions of flink that don't have the method) ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java: ## @@ -127,11 +168,62 @@ public void deserialize(ConsumerRecord record, Collector record, +Map additionalParameters, +Collector collector, +boolean isKey) +throws IOException { +if (deserializeWithAdditionalPropertiesMethod == null) { +if (isKey) { +keyDeserialization.deserialize(record.key(), collector); +} else { +valueDeserialization.deserialize(record.value(), collector); +} + +} else { +additionalParameters.put(IS_KEY, isKey); +try { +if (isKey) { +deserializeWithAdditionalPropertiesMethod.invoke( +keyDeserialization, record.key(), additionalParameters, collector); +} else { +deserializeWithAdditionalPropertiesMethod.invoke( +valueDeserialization, record.value(), additionalParameters, collector); +} +} catch (IllegalAccessException e) { Review Comment: Syntax may be cleaner with `|` as you're handling both exceptions in the same way ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java: ## @@ -36,10 +42,36 @@ public class KafkaDeserializationSchemaWrapper implements KafkaDeserializatio private static final long serialVersionUID = 2651665280744549932L; +private static Method deserializeWithAdditionalPropertiesMethod = null; + +protected static final String IS_KEY = "IS_KEY"; + +protected static final String HEADERS = "HEADERS"; + +static { Review Comment: Why is https://github.com/apache/flink-connector-kafka/pull/99/files?file-filters%5B%5D=.java=true#diff-0a1bb10356e97bd2de4e3e8a257cf8c94290dd0aadb133d9288b6d1bc60b5994R73 different in terms of the `static` block? You use a static block here, but a static method in DynamicKafkaDeserializationSchema. However, I like the idea of an interface method `initializeMethod` that returns `Method` rather than `void` - the typing gives safety that `intializeMethod` returns the Method from a class structure that has Deserialization as the parent. I do appreciate that the typing doesn't guarantee the method returned is one with the correct parameters - but I think catching the runtime exception is as good as trying to validate the method before using it https://www.tutorialspoint.com/javareflect/javareflect_method_getparametertypes.htm ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ## @@ -74,25 +113,63 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS @Override public ProducerRecord serialize( RowData consumedRow, KafkaSinkContext context, Long timestamp) { + +// keeping the metadataHeaders maps for keys and values separate so these maps +// are output only for the methods; avoiding changing the inputs. +// define the input map +Map inputMap = new HashMap<>(); + +// define the output maps, it is 2 levels deep in case we need to add new information in +// the future +Map outputKeyMap = new HashMap<>(); +Map outputValueMap = new HashMap<>(); + +inputMap.put(TOPIC_NAME, topic); + // shortcut in case no input projection is required if (keySerialization == null && !hasMetadata) { -final byte[] valueSerialized = valueSerialization.serialize(consumedRow); -return new ProducerRecord<>( -topic, -extractPartition( -consumedRow, -null, -valueSerialized, -
Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]
Zakelly commented on PR #24374: URL: https://github.com/apache/flink/pull/24374#issuecomment-2137277271 I'll merge this once the CI green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-15753][web] Displaying non-numeric metrics in Web UI dashboard [flink]
flinkbot commented on PR #24867: URL: https://github.com/apache/flink/pull/24867#issuecomment-2137215585 ## CI report: * a91eedf8891c31ea59064821ff124ca6c473da6d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-15753][web] Displaying non-numeric metrics in Web UI dashboard [flink]
yuxiqian opened a new pull request, #24867: URL: https://github.com/apache/flink/pull/24867 ## What is the purpose of the change Currently, Flink allows creating gauge metrics with various types [[1]](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#gauge). However Web UI doesn't support showing any types other than numerics and always displays `NaN`, which isn't very helpful. This change lets dashboard displays raw metric value when converting to numbers isn't applicable. ## Brief change log - Adds raw value display as fallback when receiving non-numeric metric values - Changes "Chart / Numeric" label to "Chart / Literal" because not all metrics are of number type ## Verifying this change This change is a trivial work without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Draft: [FLINK-35199][table] Support the execution of create materialized table in full refresh mode [flink]
flinkbot commented on PR #24866: URL: https://github.com/apache/flink/pull/24866#issuecomment-2137173809 ## CI report: * b0eb4ec03134eea9c5c2574f3cfd9e0e44a496f3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Draft: [FLINK-35199][table] Support the execution of create materialized table in full refresh mode [flink]
hackergin opened a new pull request, #24866: URL: https://github.com/apache/flink/pull/24866 …le in full refresh mode ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35399][doc] Add documents for batch job recovery from jobMaster failover. [flink]
zhuzhurk commented on code in PR #24855: URL: https://github.com/apache/flink/pull/24855#discussion_r1618689544 ## docs/content.zh/docs/ops/batch/recovery_from_job_master_failure.md: ## @@ -77,13 +77,17 @@ job master failover 后能够尽可能的恢复出错前的进度,避免重新 ### 让 Source 支持 job master failover 后进度恢复 -目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的 SplitEnumerator 需要实现 SupportsBatchSnapshot 接口: +目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的 +{{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java" name="SplitEnumerator" >}} +需要能够支持在批处理场景下做状态快照,并且实现 Review Comment: 做状态快照 -> 做状态快照(此时 checkpointId = -1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]
spoon-lz commented on PR #24374: URL: https://github.com/apache/flink/pull/24374#issuecomment-2137116772 @Zakelly @masteryhx Before this PR was merged, I found a new conflict with `MaterializedTableManager.java` of the master branch. I have resolved this conflict and it can be merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [1.18][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]
pnowojski commented on PR #24865: URL: https://github.com/apache/flink/pull/24865#issuecomment-2137108109 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [1.19][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]
pnowojski commented on PR #24864: URL: https://github.com/apache/flink/pull/24864#issuecomment-2137107967 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
SamBarker commented on PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2137002719 > And I find the benchmark runs extremely slow with the profiler on. The ETA is up to 8 days for full set of our daily run. So I guess this is best only used for single tests that are triggered manually, right? Yikes thats a bit sad. Running a single test (`SerializationFrameworkMiniBenchmarks.serializerHeavyString`) locally I get 10 mins estimate with or without the profiler and a runtime that seems comparable. If I widen the set of benchmarks I get the same ETA with or without the profiler. So I'm slightly suspicious that the excludes or default includes behaviour is being changed but without knowing what command line your seeing that from its hard to know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [minor][cdc] Suppress false alarm in flink config loader [flink-cdc]
leonardBang merged PR #3292: URL: https://github.com/apache/flink-cdc/pull/3292 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26808][1.19] Limit FileUploadHandler to multipart routes [flink]
uce commented on PR #24859: URL: https://github.com/apache/flink/pull/24859#issuecomment-2136967772 I fixed the checkstyle error -- we seem to have changed our checkstyle configuration between 1.19 and 1.20. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35348][table] Introduce refresh materialized table rest api [flink]
lsyldliu merged PR #24844: URL: https://github.com/apache/flink/pull/24844 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
SamBarker commented on code in PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618517159 ## benchmark.sh: ## @@ -0,0 +1,54 @@ +#!/usr/bin/env bash Review Comment: resolved with https://github.com/apache/flink-benchmarks/pull/90/commits/74e4f4f56dab07c70e03690279f96a8f168830f4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35461][config] Improve Runtime Configuration for Flink 2.0 [flink]
TanYuxin-tyx commented on code in PR #24853: URL: https://github.com/apache/flink/pull/24853#discussion_r1618468471 ## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java: ## @@ -373,11 +373,18 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration( BoundedBlockingSubpartitionType blockingSubpartitionType = getBlockingSubpartitionType(configuration); -boolean batchShuffleCompressionEnabled = - configuration.get(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED); CompressionCodec compressionCodec = configuration.get(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC); +boolean batchShuffleCompressionEnabled; +if (compressionCodec == CompressionCodec.NONE) { Review Comment: If the codec is NONE while the old config `BATCH_SHUFFLE_COMPRESSION_ENABLED` is true, do we need to print a warning log to deal with this case? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java: ## @@ -31,6 +31,7 @@ * The {@link ConfigOption configuration options} for job execution. Those are stream specific * options. See also {@link org.apache.flink.configuration.PipelineOptions}. Review Comment: I noticed that other deprecated options add the comments of `This option is deprecated in 1.20 and will be removed in 2.0`, do we need to add those comments here as well? ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -425,6 +446,21 @@ public enum CompressionCodec { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); +/** The timeout for requesting buffers for each channel. */ +@Documentation.ExcludeFromDocumentation( +"This option is purely implementation related, and may be removed as the implementation changes.") +public static final ConfigOption NETWORK_BUFFERS_REQUEST_TIMEOUT = +key("taskmanager.network.memory.exclusive-buffers-request-timeout") Review Comment: IIRC, this config key change is to remove the concept of `exclusive`, while the key remains `exclusive` after this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]
GOODBOY008 commented on code in PR #24730: URL: https://github.com/apache/flink/pull/24730#discussion_r1618505598 ## flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.TestDataGenerators; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.io.FileUtils; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CsvBulkWriterIT { + +@TempDir File outDir; + +@Test +public void testNoDataIsWrittenBeforeFlush() throws Exception { + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +env.enableCheckpointing(100); +env.setRestartStrategy(RestartStrategies.noRestart()); + +// Workaround serialization limitations +File outDirRef = new File(outDir.getAbsolutePath()); + +FileSink sink = +FileSink.forBulkFormat( +new org.apache.flink.core.fs.Path(outDir.getAbsolutePath()), +out -> { +FSDataOutputStreamWrapper outputStreamWrapper = +new FSDataOutputStreamWrapper(out); +return new CsvBulkWriterWrapper<>( +CsvBulkWriter.forPojo(Pojo.class, outputStreamWrapper), +outputStreamWrapper, +outDirRef); +}) +.build(); + +List integers = Arrays.asList(new Pojo(1), new Pojo(2)); +DataGeneratorSource generatorSource = +TestDataGenerators.fromDataWithSnapshotsLatch( +integers, TypeInformation.of(Pojo.class)); +env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "").sinkTo(sink); +env.execute(); +assertThat(getResultsFromSinkFiles(outDir)).containsSequence("1", "2", "1", "2"); +} + +private static class CsvBulkWriterWrapper implements BulkWriter { + +private static int callCounter = 0; + +private static int unFlushedCounter = 0; Review Comment: Your suggests looks more clear and easy understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35399][doc] Add documents for batch job recovery from jobMaster failover. [flink]
zhuzhurk commented on code in PR #24855: URL: https://github.com/apache/flink/pull/24855#discussion_r1618464994 ## docs/content.zh/docs/ops/batch/recovery_from_job_master_failure.md: ## @@ -75,6 +75,19 @@ JobMaster failover 后能够尽可能的恢复出错前的进度,避免重新 - [job-event.store.write-buffer.size]({{< ref "docs/deployment/config" >}}#job-event-store-write-buffer-size): JobEventStore 写出缓冲区的大小。一旦缓冲区满了,内容将被刷新到外部文件系统。 +### 让 Source 支持 job master failover 后进度恢复 + +目前,仅 new source (FLIP-27) 支持批处理作业的进度恢复。为了实现这一功能,new source 的 SplitEnumerator 需要实现 SupportsBatchSnapshot 接口: Review Comment: 为了实现这一功能,new source 的 SplitEnumerator 需要实现 SupportsBatchSnapshot 接口: -> New source 的 SplitEnumerator 需要能够支持批处理场景下的 snapshotState(checkpointId=-1),并且实现 [SupportsBatchSnapshot]({{< ref "xxx" >}}#yyy) 接口,从而能够在重启后恢复到出错前的状态。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1618480332 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +InternalGenerator gen = +new InternalGenerator( +numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +List elements = new ArrayList<>(); +gen.forEachRemaining(elements::add); +return new Generator(elements); +} Review Comment: You're right - I got a bit carried away. We shouldn't optimize it any further. But let's keep the changes as they are now since you applied it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35399][doc] Add documents for batch job recovery from jobMaster failover. [flink]
zhuzhurk commented on code in PR #24855: URL: https://github.com/apache/flink/pull/24855#discussion_r1618437083 ## docs/content/docs/ops/batch/recovery_from_job_master_failure.md: ## @@ -0,0 +1,110 @@ +--- +title: "Recovery job progress from job master failures" +weight: 4 +type: docs +aliases: + +- /docs/ops/batch/recovery_from_job_master_failure.html +- /docs/ops/batch/recovery_from_job_master_failure + +--- + + +# Batch jobs progress recovery from job master failures + +## Background + +Previously, if the JobMaster fails and is terminated, one of the following two situations will occur: + +- If high availability (HA) is disabled, the job will fail. +- If HA is enabled, a JobMaster failover will happen and the job will be restarted. Streaming jobs can resume from the + latest successful checkpoints. Batch jobs, however, do not have checkpoints and have to start over from the beginning, + losing all previously made progress. This represents a significant regression for long-running batch jobs. + +To address this issue, a batch job recovery mechanism is introduced to enable batch jobs to recover as much progress as +possible after a JobMaster failover, avoiding the need to rerun tasks that have already been finished. + +To implement this feature, a JobEventStore component is introduced to record state change events of the JobMaster +(such as ExecutionGraph, OperatorCoordinator, etc.) to an external filesystem. During the crash and subsequent restart +of the JobMaster, TaskManagers will retain the intermediate result data produced by the job and attempt to reconnect +continuously. Once the JobMaster restarts, it will re-establish connections with TaskManagers and recover the job state +based on the retained intermediate results and the events previously recorded in the JobEventStore, thereby resuming +the job's execution progress. + +## Usage + +This section explains how to enable recovery of batch jobs from JobMaster failures, how to tune it, and how to develop +sources to work with batch jobs progress recovery. + +### How to enable batch jobs progress recovery from job master failures + +- Enable cluster high availability: + + To enable the recovery of batch jobs from JobMaster failures, it is essential to first ensure that cluster + high availability (HA) is enabled. Flink supports HA services backed by ZooKeeper or Kubernetes. + More details of the configuration can be found in the [High Availability]({{< ref "docs/deployment/ha/overview#high-availability" >}}) page. +- Configure [execution.batch.job-recovery.enabled]({{< ref "docs/deployment/config" >}}#execution-batch-job-recovery-enabled): true + +Note that currently only [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) +supports this feature. And Flink batch jobs will use this scheduler by default unless another scheduler is explicitly configured. + +### Optimization + +To enable batch jobs to recover as much progress as possible after a JobMaster failover, and avoid rerunning tasks +that have already been finished, you can configure the following options for optimization: + +- [execution.batch.job-recovery.snapshot.min-pause]({{< ref "docs/deployment/config" >}}#execution-batch-job-recovery-snapshot-min-pause): + This setting determines the minimum pause time allowed between snapshots for the OperatorCoordinator and ShuffleMaster. + This parameter could be adjusted based on the expected I/O load of your cluster and the tolerable amount of state regression. + Reduce this interval if smaller state regressions are preferred and a higher I/O load is acceptable. +- [execution.batch.job-recovery.previous-worker.recovery.timeout]({{< ref "docs/deployment/config" >}}#execution-batch-job-recovery-previous-worker-recovery-timeout): + This setting determines the timeout duration allowed for Shuffle workers to reconnect. During the recovery process, Flink + requests the retained intermediate result data information from the Shuffle Master. If the timeout is reached, + Flink will use all the acquired intermediate result data to recover the state. +- [job-event.store.write-buffer.flush-interval]({{< ref "docs/deployment/config" >}}#job-event-store-write-buffer-flush-interval): + This setting determines the flush interval for the JobEventStore's write buffers. +- [job-event.store.write-buffer.size]({{< ref "docs/deployment/config" >}}#job-event-store-write-buffer-size): This + setting determines the write buffer size in the JobEventStore. When the buffer is full, its contents are flushed to the external + filesystem. + +### Enable batch jobs progress recovery for sources + +Currently, only the new source (FLIP-27) supports progress recovery for batch jobs. To enable this feature, the SplitEnumerator of a new source (FLIP-27) need implement the SupportsBatchSnapshot interface: + + +public interface SupportsBatchSnapshot {}
Re: [PR] Enable async profiler [flink-benchmarks]
Zakelly commented on PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2136859786 And I find the benchmark runs extremely slow with the profiler on. The ETA is up to 8 days for full set of our daily run. So I guess this is best only used for single tests that are triggered manually, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1618451567 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +InternalGenerator gen = +new InternalGenerator( +numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +List elements = new ArrayList<>(); +gen.forEachRemaining(elements::add); +return new Generator(elements); +} Review Comment: I applied your suggestion as I like it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
Zakelly commented on code in PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618445050 ## benchmark.sh: ## @@ -0,0 +1,54 @@ +#!/usr/bin/env bash Review Comment: `chown +x` to make this executable? ## pom.xml: ## @@ -67,6 +67,7 @@ under the License. org.apache.flink.benchmark.full.*,org.apache.flink.state.benchmark.*,org.apache.flink.scheduler.benchmark.* .* java + Review Comment: How about adding a property for path for profiling result? And I suggest a relative path under current path for default value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1618443253 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +InternalGenerator gen = +new InternalGenerator( +numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +List elements = new ArrayList<>(); +gen.forEachRemaining(elements::add); +return new Generator(elements); +} Review Comment: Did not want to edit too much the original logic of the generator itself But it makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [1.19][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]
flinkbot commented on PR #24864: URL: https://github.com/apache/flink/pull/24864#issuecomment-2136816444 ## CI report: * ab8e3b5d59abf674a8cb72ea87df211cb7f0787a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [1.18][FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]
flinkbot commented on PR #24865: URL: https://github.com/apache/flink/pull/24865#issuecomment-2136816894 ## CI report: * cb9b44b4f7fa394884d2b4821ca5ed3cd93f0b70 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]
pnowojski merged PR #24857: URL: https://github.com/apache/flink/pull/24857 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
pnowojski closed pull request #24784: [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… URL: https://github.com/apache/flink/pull/24784 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35461][config] Improve Runtime Configuration for Flink 2.0 [flink]
Sxnan commented on code in PR #24853: URL: https://github.com/apache/flink/pull/24853#discussion_r1618380520 ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -428,6 +455,21 @@ public enum CompressionCodec { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); +/** The timeout for requesting buffers for each channel. */ +@Documentation.ExcludeFromDocumentation( +"This option is purely implementation related, and may be removed as the implementation changes.") +public static final ConfigOption NETWORK_BUFFERS_REQUEST_TIMEOUT = Review Comment: Removing the old `ConfigOption` right away will cause the code that directly uses the `ConfigOption` to fail. It is better to mark it deprecated in 1.20 and remove it in 2.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35430][cdc-connector][kafka] Passed the zoneId to JsonSerializationSchema. [flink-cdc]
joyCurry30 commented on PR #3359: URL: https://github.com/apache/flink-cdc/pull/3359#issuecomment-2136672889 > Hi @joyCurry30, could you please create another PR to backport this to `release-3.1`? We're preparing for the next patch release, and it would be nice to include this bugfix. https://github.com/apache/flink-cdc/pull/3377 Hi, cc plz. @yuxiqian -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][FLINK-35430][cdc-connector][kafka] Passed the zoneId to JsonSerializationSchema. [flink-cdc]
joyCurry30 opened a new pull request, #3377: URL: https://github.com/apache/flink-cdc/pull/3377 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35325][cdc-connector][paimon]Support for specifying column order. [flink-cdc]
joyCurry30 commented on PR #3323: URL: https://github.com/apache/flink-cdc/pull/3323#issuecomment-2136669682 > Hi @joyCurry30, could you please backport it to `release-3.1` branch too? https://github.com/apache/flink-cdc/pull/3376 cc plz. @yuxiqian -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. [flink-cdc]
joyCurry30 opened a new pull request, #3376: URL: https://github.com/apache/flink-cdc/pull/3376 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]
yuxiqian opened a new pull request, #3375: URL: https://github.com/apache/flink-cdc/pull/3375 Cherry-picked from #3348. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
showuon commented on code in PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618289766 ## README.md: ## @@ -64,6 +64,20 @@ java -jar target/benchmarks.jar "" -lp java -jar target/benchmarks.jar "org.apache.flink.state.benchmark.*" -p "backendType=ROCKSDB" ``` +## Generating Flame graphs + +JMH has support for enabling profilers when running benchmarks, in particular [async profiler](https://github.com/async-profiler/async-profiler) which can generate flame graphs of the call stack. However, async profiler requires linking with native code which needs downloaded manually which means its location is user and architecture specific. To enable async profiler support run the benchmarks as follows: + +with maven +``` + java -jar target/benchmarks.jar -rf csv "" -DasyncProfilerLib= Review Comment: nit: additional space at the beginning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
yuxiqian commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2136650327 Hi @loserwang1024, could you please backport this patch to `release-3.1` branch so that it could be released with CDC 3.1.1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35325][cdc-connector][paimon]Support for specifying column order. [flink-cdc]
yuxiqian commented on PR #3323: URL: https://github.com/apache/flink-cdc/pull/3323#issuecomment-2136645319 Hi @joyCurry30, could you please backport it to `release-3.1` branch too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35430][cdc-connector][kafka] Passed the zoneId to JsonSerializationSchema. [flink-cdc]
yuxiqian commented on PR #3359: URL: https://github.com/apache/flink-cdc/pull/3359#issuecomment-2136644955 Hi @joyCurry30, could you please create another PR to backport this to `release-3.1`? We're preparing for the next patch release, and it would be nice to include this bugfix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp [flink-cdc]
yuxiqian commented on PR #3332: URL: https://github.com/apache/flink-cdc/pull/3332#issuecomment-2136642062 Hi @czy006, could you please open another PR to backport this patch to `release-3.1` branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][FLINK-35295][mysql] Improve jdbc connection pool initialization failure message [flink-cdc]
yuxiqian opened a new pull request, #3374: URL: https://github.com/apache/flink-cdc/pull/3374 Cherry-picked from #3293. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][FLINK-35294][mysql] Use source config to check if the filter should be applied in timestamp starting mode [flink-cdc]
yuxiqian opened a new pull request, #3373: URL: https://github.com/apache/flink-cdc/pull/3373 Cherry-picked from #3291. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned checkpoint with custom partitioner [flink]
ldadima commented on PR #24857: URL: https://github.com/apache/flink/pull/24857#issuecomment-2136570818 Thanks for changes. LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][FLINK-35464] Fixes operator state backwards compatibility from CDC 3.0.x [flink-cdc]
yuxiqian opened a new pull request, #3370: URL: https://github.com/apache/flink-cdc/pull/3370 This closes [FLINK-35441](https://issues.apache.org/jira/browse/FLINK-35441) and [FLINK-35464](https://issues.apache.org/jira/browse/FLINK-35464). See #3369 for more details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc source. [flink-connector-jdbc]
RocMarshal opened a new pull request, #122: URL: https://github.com/apache/flink-connector-jdbc/pull/122 [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc source. - Only for English edition pages. - [ ] The translation is panned to do in a new jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]
ldadima commented on PR #24784: URL: https://github.com/apache/flink/pull/24784#issuecomment-2136510387 > Thanks for the update @ldadima . I have fixed up a couple of things in #24857 so let me close this PR in favour of that one. Could you take a look at my version? I checked the MR. I agree with changes about ITCase. In my case I forget about uid, that's why map vertex doesn't work for me in rescale case. Thanks for changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
SamBarker commented on PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#issuecomment-2136485697 @Zakelly any news on the jenkins env? I'm largely hoping its a no-op there right now. @pnowojski readme and executable bit set. Is there anything else your looking for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
SamBarker commented on code in PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90#discussion_r1618157436 ## benchmark.sh: ## @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +JAVA_ARGS=() +JMH_ARGS=() +BINARY="java" +BENCHMARK_PATTERN= + +while getopts ":j:c:b:e:p:a:m:h" opt; do + case $opt in +j) JAVA_ARGS+=("${OPTARG}") +;; +c) CLASSPATH_ARG="${OPTARG}" +;; +b) BINARY="${OPTARG}" +;; +p) PROFILER_ARG="${OPTARG:+-prof ${OPTARG}}" +# conditional prefixing inspired by https://stackoverflow.com/a/40771884/1389220 +;; +a) JMH_ARGS+=("${OPTARG}") +;; +e) BENCHMARK_EXCLUDES="${OPTARG:+-e ${OPTARG}}" +;; +m) BENCHMARK_PATTERN="${OPTARG}" + echo "parsing -m" +;; +h) + 1>&2 cat << EOF +usage: TODO +EOF + exit 1 +;; +\?) echo "Invalid option -$opt ${OPTARG}" >&2 +exit 1 +;; + esac +done +shift "$(($OPTIND -1))" Review Comment: I had a go at doing it in python https://github.com/SamBarker/flink-benchmarks/blob/generate-flamegraphs-py/run-benchmarks.py life got quite funky trying to pass `--add-opens` to something that actually understood long arguments. I can merge that into this PR if preferred but I'm not sure it actually improves things here. The other option might be to look at [ap-loader](https://github.com/jvm-profiling-tools/ap-loader) which would allow us to ensure that the native libraries are available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35461][config] Improve Runtime Configuration for Flink 2.0 [flink]
xintongsong commented on code in PR #24853: URL: https://github.com/apache/flink/pull/24853#discussion_r1618145159 ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -316,7 +316,11 @@ public enum CompressionCodec { /** * Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking * shuffle. + * + * @deprecated The hash-based blocking shuffle is deprecated in 1.20 and will be totally removed + * in 2.0. */ +@Deprecated @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) Review Comment: This can also be removed, because deprecated config options won't appear in docs. ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -428,6 +455,21 @@ public enum CompressionCodec { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); +/** The timeout for requesting buffers for each channel. */ +@Documentation.ExcludeFromDocumentation( +"This option is purely implementation related, and may be removed as the implementation changes.") +public static final ConfigOption NETWORK_BUFFERS_REQUEST_TIMEOUT = Review Comment: The old `ConfigOption` can be removed as it is already covered by `withDeprecatedKeys` of the new option. ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -316,7 +316,11 @@ public enum CompressionCodec { /** * Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking * shuffle. + * + * @deprecated The hash-based blocking shuffle is deprecated in 1.20 and will be totally removed + * in 2.0. */ +@Deprecated @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) Review Comment: Same for other deprecated options with documentation related annotations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs-zh] maven should require v3.8.6, not higher [flink]
flinkbot commented on PR #24863: URL: https://github.com/apache/flink/pull/24863#issuecomment-2136449270 ## CI report: * 723de4273c06f3974e3a21808e6c5cf8ed6f7b84 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][docs-zh] maven should require v3.8.6 [flink]
showuon opened a new pull request, #24863: URL: https://github.com/apache/flink/pull/24863 ## What is the purpose of the change We require maven v3.8.6 (not higher) to build flink, otherwise, it'll get error: `Detected Maven Version: 3.9.6 is not in the allowed range [3.8.6,3.8.6].` ## Brief change log Update the wrong requirement in maven.md in Chinese. ## Verifying this change Only doc is changed. ## Does this pull request potentially affect one of the following parts: Only doc is changed. ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]
robobario commented on code in PR #24730: URL: https://github.com/apache/flink/pull/24730#discussion_r1618053167 ## flink-formats/flink-csv/src/test/resources/log4j2-test.properties: ## @@ -0,0 +1,28 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO Review Comment: looking at other modules and the comment above I think this should be checked in set to `OFF` ## flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.TestDataGenerators; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.io.FileUtils; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CsvBulkWriterIT { + +@TempDir File outDir; + +@Test +public void testNoDataIsWrittenBeforeFlush() throws Exception { + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +env.enableCheckpointing(100); +env.setRestartStrategy(RestartStrategies.noRestart()); + +// Workaround serialization limitations +File outDirRef = new File(outDir.getAbsolutePath()); + +FileSink sink = +FileSink.forBulkFormat( +new org.apache.flink.core.fs.Path(outDir.getAbsolutePath()), +out -> { +FSDataOutputStreamWrapper outputStreamWrapper = +new FSDataOutputStreamWrapper(out); +return new CsvBulkWriterWrapper<>( +CsvBulkWriter.forPojo(Pojo.class, outputStreamWrapper), +outputStreamWrapper, +outDirRef); +}) +.build(); + +List integers = Arrays.asList(new Pojo(1), new Pojo(2)); +DataGeneratorSource generatorSource = +TestDataGenerators.fromDataWithSnapshotsLatch( +integers, TypeInformation.of(Pojo.class)); +env.fromSource(generatorSource,
Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]
lvyanquan commented on code in PR #3348: URL: https://github.com/apache/flink-cdc/pull/3348#discussion_r1618087973 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java: ## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; +import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; +import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; +import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.USERNAME; + +/** IT tests for {@link StarRocksMetadataApplier}. */ +public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase { +private static final StreamExecutionEnvironment env = +StreamExecutionEnvironment.getExecutionEnvironment(); + +@BeforeClass +public static void before() { +env.setParallelism(DEFAULT_PARALLELISM); +env.enableCheckpointing(3000); +env.setRestartStrategy(RestartStrategies.noRestart()); +} + +@Before +public void initializeDatabase() { +executeSql( +String.format( +"CREATE DATABASE IF NOT EXISTS `%s`;", +StarRocksContainer.STARROCKS_DATABASE_NAME)); +LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME); +} + +@After +public void destroyDatabase() { +executeSql(String.format("DROP DATABASE %s;", StarRocksContainer.STARROCKS_DATABASE_NAME)); +LOG.info("Database {} destroyed.", StarRocksContainer.STARROCKS_DATABASE_NAME); +} + +private List generateAddColumnEvents(TableId tableId) { +Schema schema = +Schema.newBuilder() +.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) +.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) +.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) +
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1618086605 ## flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.execution; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; +import org.apache.flink.core.execution.JobExecutionStatusEvent; +import org.apache.flink.core.execution.JobStatusChangedEvent; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerFactory; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for job status changed listener. */ +public class JobStatusChangedListenerITCase { +private static List statusChangedEvents = new ArrayList<>(); + +@Test +void testJobStatusChanged() throws Exception { +Configuration configuration = new Configuration(); +configuration.set( +JOB_STATUS_CHANGED_LISTENERS, + Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName())); +try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration)) { +List sourceValues = Arrays.asList("a", "b", "c"); +List resultValues = new ArrayList<>(); +try (CloseableIterator iterator = +env.fromCollection(sourceValues).executeAndCollect()) { +while (iterator.hasNext()) { +resultValues.add(iterator.next()); +} +} + assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new String[0])); +} +assertThat(statusChangedEvents.size()).isEqualTo(3); +assertThat(statusChangedEvents.get(0).jobId()) +.isEqualTo(statusChangedEvents.get(1).jobId()); +assertThat(statusChangedEvents.get(0).jobName()) +.isEqualTo(statusChangedEvents.get(1).jobName()); + +assertThat(statusChangedEvents.get(1).jobId()) +.isEqualTo(statusChangedEvents.get(2).jobId()); +assertThat(statusChangedEvents.get(1).jobName()) +.isEqualTo(statusChangedEvents.get(2).jobName()); + +statusChangedEvents.forEach( +event -> { +if (event instanceof DefaultJobExecutionStatusEvent) { +JobExecutionStatusEvent status = (JobExecutionStatusEvent) event; +assertThat( +(status.oldStatus() == JobStatus.CREATED +&& status.newStatus() == JobStatus.RUNNING) Review Comment: Good suggestion. Added test cases accordingly. It actually help me to find out that the job status changed listener should be added for MiniClusterExecutor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]
lvyanquan commented on code in PR #3348: URL: https://github.com/apache/flink-cdc/pull/3348#discussion_r1618083749 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; +import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; +import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; + +/** IT tests for {@link DorisMetadataApplier}. */ +@RunWith(Parameterized.class) +public class DorisMetadataApplierITCase extends DorisSinkTestBase { +private static final StreamExecutionEnvironment env = +StreamExecutionEnvironment.getExecutionEnvironment(); + +private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; + +private final boolean batchMode; + +public DorisMetadataApplierITCase(boolean batchMode) { +this.batchMode = batchMode; +} + +@Parameters(name = "batchMode: {0}") +public static Iterable data() { +return Arrays.asList(true, false); +} + +@BeforeClass +public static void before() { +env.setParallelism(DEFAULT_PARALLELISM); +env.enableCheckpointing(3000); +env.setRestartStrategy(RestartStrategies.noRestart()); +} + +@Before +public void initializeDatabase() { +createDatabase(DorisContainer.DORIS_DATABASE_NAME); + +// waiting for table to be created +DORIS_CONTAINER.waitForLog( +String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME), +1, +DATABASE_OPERATION_TIMEOUT_SECONDS); +
Re: [PR] [FLINK-35415][base] Fix compatibility with Flink 1.19 [flink-cdc]
lvyanquan commented on code in PR #3348: URL: https://github.com/apache/flink-cdc/pull/3348#discussion_r1618083749 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java: ## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; +import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; +import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; + +/** IT tests for {@link DorisMetadataApplier}. */ +@RunWith(Parameterized.class) +public class DorisMetadataApplierITCase extends DorisSinkTestBase { +private static final StreamExecutionEnvironment env = +StreamExecutionEnvironment.getExecutionEnvironment(); + +private static final int DATABASE_OPERATION_TIMEOUT_SECONDS = 5; + +private final boolean batchMode; + +public DorisMetadataApplierITCase(boolean batchMode) { +this.batchMode = batchMode; +} + +@Parameters(name = "batchMode: {0}") +public static Iterable data() { +return Arrays.asList(true, false); +} + +@BeforeClass +public static void before() { +env.setParallelism(DEFAULT_PARALLELISM); +env.enableCheckpointing(3000); +env.setRestartStrategy(RestartStrategies.noRestart()); +} + +@Before +public void initializeDatabase() { +createDatabase(DorisContainer.DORIS_DATABASE_NAME); + +// waiting for table to be created +DORIS_CONTAINER.waitForLog( +String.format(".*createDb dbName = %s,.*\\s", DorisContainer.DORIS_DATABASE_NAME), +1, +DATABASE_OPERATION_TIMEOUT_SECONDS); +
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
X-czh commented on PR #24845: URL: https://github.com/apache/flink/pull/24845#issuecomment-2136325346 @reswqa Hi, could you help take a review when you have time? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35454] Allow connector classes to depend on internal Flink util classes [flink]
JingGe merged PR #24843: URL: https://github.com/apache/flink/pull/24843 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-34379][table] Fix adding catalogtable logic [flink]
flinkbot commented on PR #24862: URL: https://github.com/apache/flink/pull/24862#issuecomment-2136180667 ## CI report: * 3e9ada3635c286697f25410f3e7a4ceb81f34220 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-34379][table] Fix adding catalogtable logic [flink]
flinkbot commented on PR #24861: URL: https://github.com/apache/flink/pull/24861#issuecomment-2136175334 ## CI report: * 6dae529f1f7578c3666315aa1b0ea04e3b7b97ff UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.17][FLINK-34379][table] Fix adding catalogtable logic [flink]
jeyhunkarimov opened a new pull request, #24862: URL: https://github.com/apache/flink/pull/24862 This is BP to 1.17 PR for https://github.com/apache/flink/commit/87b7193846090897b2feabf716ee5378bcd7585b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.18][FLINK-34379][table] Fix adding catalogtable logic [flink]
jeyhunkarimov opened a new pull request, #24861: URL: https://github.com/apache/flink/pull/24861 This is BP to 1.18 PR for https://github.com/apache/flink/commit/87b7193846090897b2feabf716ee5378bcd7585b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34379][table] Fix adding catalogtable logic [flink]
flinkbot commented on PR #24860: URL: https://github.com/apache/flink/pull/24860#issuecomment-2136170701 ## CI report: * b8993db5e7956bd8e0c9466df3b378a18a126148 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34379][table] Fix adding catalogtable logic [flink]
jeyhunkarimov opened a new pull request, #24860: URL: https://github.com/apache/flink/pull/24860 This is BP to 1.19 PR for 87b7193846090897b2feabf716ee5378bcd7585b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1617923919 ## flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.execution; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; +import org.apache.flink.core.execution.JobExecutionStatusEvent; +import org.apache.flink.core.execution.JobStatusChangedEvent; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerFactory; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for job status changed listener. */ +public class JobStatusChangedListenerITCase { +private static List statusChangedEvents = new ArrayList<>(); + +@Test +void testJobStatusChanged() throws Exception { +Configuration configuration = new Configuration(); +configuration.set( +JOB_STATUS_CHANGED_LISTENERS, + Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName())); +try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration)) { +List sourceValues = Arrays.asList("a", "b", "c"); +List resultValues = new ArrayList<>(); +try (CloseableIterator iterator = +env.fromCollection(sourceValues).executeAndCollect()) { +while (iterator.hasNext()) { +resultValues.add(iterator.next()); +} +} + assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new String[0])); +} +assertThat(statusChangedEvents.size()).isEqualTo(3); +assertThat(statusChangedEvents.get(0).jobId()) +.isEqualTo(statusChangedEvents.get(1).jobId()); +assertThat(statusChangedEvents.get(0).jobName()) +.isEqualTo(statusChangedEvents.get(1).jobName()); + +assertThat(statusChangedEvents.get(1).jobId()) +.isEqualTo(statusChangedEvents.get(2).jobId()); +assertThat(statusChangedEvents.get(1).jobName()) +.isEqualTo(statusChangedEvents.get(2).jobName()); + +statusChangedEvents.forEach( +event -> { +if (event instanceof DefaultJobExecutionStatusEvent) { +JobExecutionStatusEvent status = (JobExecutionStatusEvent) event; +assertThat( +(status.oldStatus() == JobStatus.CREATED +&& status.newStatus() == JobStatus.RUNNING) Review Comment: can we add a test that check for a :`FAILING` / `FAILED` and for `CANCELLING`:`CANCELED`: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1617917890 ## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ## @@ -153,7 +173,18 @@ private CompletableFuture submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( -jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); +jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { +PipelineExecutorUtils.notifyJobStatusListeners( +pipeline, jobGraph, jobStatusChangedListeners); +} else { +LOG.error( +"Fail to submit job graph to application cluster", Review Comment: nit: Fail => Failed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1606207458 ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory> implements CacheSupportedPipelineExecutor { +private final ExecutorService executorService = +Executors.newFixedThreadPool( +4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); private final ClientFactory clusterClientFactory; +private final Configuration configuration; +private final List jobStatusChangedListeners; -public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { +public AbstractSessionClusterExecutor( +@Nonnull final ClientFactory clusterClientFactory, Configuration configuration) { this.clusterClientFactory = checkNotNull(clusterClientFactory); +this.configuration = configuration; +this.jobStatusChangedListeners = +JobStatusChangedListenerUtils.createJobStatusChangedListeners( +this.getClass().getClassLoader(), configuration, executorService); Review Comment: We basically need to load the job status changed listeners in flink libs or plugins here. Yes, thread context class loader makes more sense. ## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ## @@ -153,7 +173,14 @@ private CompletableFuture submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( -jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); +jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { Review Comment: Discussed with @davidradl offline. The throwable is not able to rethrow in whenCompleteAsync. Thus, log the exception rather than swallow the throwable. https://stackoverflow.com/questions/71668871/completablefuture-whencompleteasync-does-not-let-me-re-throw-an-exception ## flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java: ## @@ -355,7 +368,7 @@ public CompletableFuture requestJobResult(@Nonnull JobID jobId) { } @Override -public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { Review Comment: With Gyula's suggestion, we don't need to change the API now. ## flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java: ## @@ -454,6 +467,24 @@ public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { receiver, error); } else { +RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); +if (jobStatusChangedListeners.size() > 0) { + jobStatusChangedListeners.forEach( +listener -> + listener.onEvent( +new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + pipeline == null + ? null + : ((StreamGraph) + pipeline) + .getLineageGraph(), + executionMode))); +} + LOG.info( "Successfully submitted job '{}' ({}) to '{}'.",
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2136055216 Thanks for the feedback @vahmed-hamdy. I replied to all the feedback except the documentation one which is still in progress. Is there a package where I suppose to add the documentation for the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]
morazow commented on code in PR #24426: URL: https://github.com/apache/flink/pull/24426#discussion_r1617850492 ## .github/workflows/nightly.yml: ## @@ -94,3 +94,46 @@ jobs: s3_bucket: ${{ secrets.IT_CASE_S3_BUCKET }} s3_access_key: ${{ secrets.IT_CASE_S3_ACCESS_KEY }} s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }} + + build_python_wheels: +name: "Build Python Wheels on ${{ matrix.os_name }}" +runs-on: ${{ matrix.os }} +strategy: + fail-fast: false + matrix: +include: + - os: ubuntu-latest +os_name: linux + - os: macos-latest +os_name: macos +steps: + - name: "Checkout the repository" +uses: actions/checkout@v4 +with: + fetch-depth: 0 + persist-credentials: false + - name: "Stringify workflow name" +uses: "./.github/actions/stringify" +id: stringify_workflow +with: + value: ${{ github.workflow }} + - name: "Build python wheels for ${{ matrix.os_name }}" +uses: pypa/cibuildwheel@v2.16.5 Review Comment: Great, updated ✔️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26808][1.19] Limit FileUploadHandler to multipart routes [flink]
flinkbot commented on PR #24859: URL: https://github.com/apache/flink/pull/24859#issuecomment-2136017635 ## CI report: * da45a7843f59b1fb2070b2de55cbd4214e05ae0f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-26808][1.19] Limit FileUploadHandler to multipart routes [flink]
uce opened a new pull request, #24859: URL: https://github.com/apache/flink/pull/24859 See https://github.com/apache/flink/pull/24856 for the PR description. This is a backport for 1.19. The commits applied cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26808] Limit FileUploadHandler to multipart routes [flink]
uce commented on PR #24856: URL: https://github.com/apache/flink/pull/24856#issuecomment-2136005466 I was too trigger happy and missed to fixup the `review` commit before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org