Re: [PR] [FLINK-33776] [Test] only close the cluster once to make the test `ClientHeartbeatTest` faster [flink]
TestBoost commented on PR #23910: URL: https://github.com/apache/flink/pull/23910#issuecomment-1851472434 Hi reswqa, Thank you so much for your reply! I just changed the title but I don't know if it's suitable. Yes, right now the test cluster is started before every test method. However, there is no need to start per-test because these tests are not affecting each other and we can test it just on the same cluster. Thanks, TestBoost -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]
Myasuka commented on code in PR #23509: URL: https://github.com/apache/flink/pull/23509#discussion_r1423537299 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java: ## @@ -72,6 +73,7 @@ public MemoryBackendCheckpointStorageAccess( JobID jobId, @Nullable Path checkpointsBaseDirectory, @Nullable Path defaultSavepointLocation, +boolean createCheckpointSubDirs, Review Comment: How about introducing another constructor to make `createCheckpointSubDirs` as default `true`? If so, we can avoid touching many test code in this PR. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java: ## @@ -141,6 +141,12 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur */ private final int writeBufferSize; +/** + * Switch to create checkpoint sub-directory with name of jobId. A value of 'undefined' means + * not yet configured, in which case the default will be used. + */ +private TernaryBoolean createCheckpointSubDirs = TernaryBoolean.UNDEFINED; Review Comment: I wonder do we really need to introduce an undefined Boolean here in such way. Currently, we only have one way to set this value within a private constructor: ~~~java this.createCheckpointSubDirs = original.createCheckpointSubDirs.resolveUndefined( configuration.get(CheckpointingOptions.CREATE_CHECKPOINT_SUB_DIS)); ~~~ However, there is no public constructor or setter to set the `createCheckpointSubDirs` in the `original` one, that is to say we will always get a `TernaryBoolean.UNDEFINED` `createCheckpointSubDirs` in the original. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33736][Scheduler] Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier [flink]
flinkbot commented on PR #23911: URL: https://github.com/apache/flink/pull/23911#issuecomment-1851449909 ## CI report: * 4f20956497f2d5b1255419eb5f31d03737c9a597 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
1996fanrui commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1423553344 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -184,7 +184,7 @@ public class RestartStrategyOptions { public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER = ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier") .doubleType() -.defaultValue(2.0) +.defaultValue(1.2) Review Comment: The default value related commit is migrated to a new [separate PR](https://github.com/apache/flink/pull/23911), we can follow it there, thanks~ Also, if this PR(FLINK-32895) no other comments in 72 hours, I will merge it, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33736) Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier
[ https://issues.apache.org/jira/browse/FLINK-33736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33736: --- Labels: pull-request-available (was: ) > Update default value of exponential-delay.max-backoff and > exponential-delay.backoff-multiplier > -- > > Key: FLINK-33736 > URL: https://issues.apache.org/jira/browse/FLINK-33736 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Update default value of exponential-delay.max-backoff from 5min to 1min. > Update default value of exponential-delay.backoff-multiplier from 2.0 to 1.2. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33714) Update documentation about the usage of RuntimeContext#getExecutionConfig
[ https://issues.apache.org/jira/browse/FLINK-33714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-33714. -- Resolution: Fixed master(1.19) via 3531998adad20f3904d1421a35a69fef44b2b69f. > Update documentation about the usage of RuntimeContext#getExecutionConfig > - > > Key: FLINK-33714 > URL: https://issues.apache.org/jira/browse/FLINK-33714 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.19.0 >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33736][Scheduler] Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier [flink]
1996fanrui opened a new pull request, #23911: URL: https://github.com/apache/flink/pull/23911 ## What is the purpose of the change See [FLIP-364: Improve the exponential-delay restart-strategy](https://cwiki.apache.org/confluence/x/uJqzDw) . This PR includes the subtask1 of FLIP-364. ## Brief change log - [FLINK-33736][Scheduler] Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? Java doc and config doc are updated, and updated a little official web doc related to `restart-strategy.exponential-delay.attempts-before-reset-backoff`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33714) Update documentation about the usage of RuntimeContext#getExecutionConfig
[ https://issues.apache.org/jira/browse/FLINK-33714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33714: --- Affects Version/s: 1.19.0 > Update documentation about the usage of RuntimeContext#getExecutionConfig > - > > Key: FLINK-33714 > URL: https://issues.apache.org/jira/browse/FLINK-33714 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.19.0 >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33714) Update documentation about the usage of RuntimeContext#getExecutionConfig
[ https://issues.apache.org/jira/browse/FLINK-33714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-33714: --- Fix Version/s: 1.19.0 > Update documentation about the usage of RuntimeContext#getExecutionConfig > - > > Key: FLINK-33714 > URL: https://issues.apache.org/jira/browse/FLINK-33714 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33714][doc] Update the documentation about the usage of RuntimeContext#getExecutionConfig. [flink]
reswqa merged PR #23906: URL: https://github.com/apache/flink/pull/23906 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
1996fanrui commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1423542955 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -184,7 +184,7 @@ public class RestartStrategyOptions { public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER = ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier") .doubleType() -.defaultValue(2.0) +.defaultValue(1.2) Review Comment: Thanks @zhuzhurk for the quick feedback! I have updated it in the `dev` mail list. Let us wait for more feedback for a while. Also, I pause FLINK-33736 first. Therefore, I will remove the first commit from this PR and continue working on the rest of the JIRA for FLIP-364. [1] https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process
[ https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795627#comment-17795627 ] Hangxiang Yu commented on FLINK-26585: -- [~jingge] I saw 1.18.1 is releasing. Could this PR be involved in 1.18.1 or next 1.18.2 ? It could help a lot in the case [~czchen] mentioned. > State Processor API: Loading a state set buffers the whole state set in > memory before starting to process > - > > Key: FLINK-26585 > URL: https://issues.apache.org/jira/browse/FLINK-26585 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.13.0, 1.14.0, 1.15.0 >Reporter: Matthias Schwalbe >Assignee: Matthias Schwalbe >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: MultiStateKeyIteratorNoStreams.java > > > * When loading a state, MultiStateKeyIterator load and bufferes the whole > state in memory before it event processes a single data point > ** This is absolutely no problem for small state (hence the unit tests work > fine) > ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state > descriptors and flattens all datapoints contained within > ** The java.util.stream.Stream#flatMap function causes the buffering of the > whole data set when enumerated later on > ** See call stack [1] > *** I our case this is 150e6 data points (> 1GiB just for the pointers to > the data, let alone the data itself ~30GiB) > ** I’m not aware of some instrumentation of Stream in order to avoid the > problem, hence > ** I coded an alternative implementation of MultiStateKeyIterator that > avoids using java Stream, > ** I can contribute our implementation (MultiStateKeyIteratorNoStreams) > [1] > Streams call stack: > hasNext:77, RocksStateKeysIterator > (org.apache.flink.contrib.streaming.state.iterator) > next:82, RocksStateKeysIterator > (org.apache.flink.contrib.streaming.state.iterator) > forEachRemaining:116, Iterator (java.util) > forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util) > forEach:580, ReferencePipeline$Head (java.util.stream) > accept:270, ReferencePipeline$7$1 (java.util.stream) > # Stream flatMap(final Function Stream> var1) > accept:373, ReferencePipeline$11$1 (java.util.stream) > # Stream peek(final Consumer var1) > accept:193, ReferencePipeline$3$1 (java.util.stream) > # Stream map(final Function > var1) > tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util) > lambda$initPartialTraversalState$0:294, > StreamSpliterators$WrappingSpliterator (java.util.stream) > getAsBoolean:-1, 1528195520 > (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57) > fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator > (java.util.stream) > doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator > (java.util.stream) > tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream) > hasNext:681, Spliterators$1Adapter (java.util) > hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input) > hasNext:162, KeyedStateReaderOperator$NamespaceDecorator > (org.apache.flink.state.api.input.operator) > reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input) > invoke:191, DataSourceTask (org.apache.flink.runtime.operators) > doRun:776, Task (org.apache.flink.runtime.taskmanager) > run:563, Task (org.apache.flink.runtime.taskmanager) > run:748, Thread (java.lang) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423504221 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: I see your point about the heap usage with a large number of TMs and it makes sense. I don't think most jobs would be affected by this issue but we could increase the default threshold to 95% to be on the safe side. Regarding the follow up, please go ahead with the ticket :) and feel free to work on it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30702] Add Elasticsearch dialect [flink-connector-jdbc]
grzegorz8 commented on code in PR #67: URL: https://github.com/apache/flink-connector-jdbc/pull/67#discussion_r1420783379 ## flink-connector-jdbc/pom.xml: ## @@ -38,10 +38,12 @@ under the License. 2.12 2.12.7 3.23.1 + 2.15.2 42.5.1 21.8.0.0 418 1.12.10 + 8.11.1 Review Comment: @MartijnVisser Hi! What should be done to ensure if adding this dependency (even as "provided") is allowed? The same question for enabling Elastic trial for testing purposes. Should I create a ticket somewhere? Write to elastic_lice...@elastic.co? What do you suggest? Or should we already close the PR and corresponding ticket due to incompatible license? The license is not listed on Category X: https://www.apache.org/legal/resolved.html#category-x (nor in any other). Assuming Elastic License is "Category X", I think we are still safe, since we do not violate the following rules mentioned in the document: _"They may not be distributed"_ and _"You may rely on them when they support an optional feature"_. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]
liyubin117 commented on code in PR #23678: URL: https://github.com/apache/flink/pull/23678#discussion_r1423393030 ## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ## @@ -297,10 +295,104 @@ void testVariableLengthDataGeneration() throws Exception { anyCauseMatches( ValidationException.class, String.format( -"Only supports specifying '%s' option for variable-length types (varchar, string, varbinary, bytes). The type of field %s is not within this range.", +"Only supports specifying '%s' option for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field '%s' is not within this range.", DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4"))); } +@Test +void testVariableLengthDataType() throws Exception { +DescriptorProperties descriptor = new DescriptorProperties(); +final int rowsNumber = 200; +descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); +descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + +List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +assertThat(results).hasSize(rowsNumber); + +for (RowData row : results) { +assertThat(row.getString(2).toString()).hasSize(30); +assertThat(row.getBinary(3)).hasSize(20); +assertThat(row.getString(4).toString()) + .hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT); +} + +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.LENGTH, +25); +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH, +); + +results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); + +for (RowData row : results) { +assertThat(row.getString(2).toString()).hasSize(25); +assertThat(row.getString(4).toString()).hasSize(); +} + +assertThatThrownBy( +() -> { +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS ++ ".f3." ++ DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS ++ ".f3." ++ DataGenConnectorOptionsUtil.LENGTH, +21); + +runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +}) +.satisfies( +anyCauseMatches( +ValidationException.class, +"Custom length '21' for variable-length type (VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined in the schema.")); +} + +@Test +void testFixedLengthDataType() throws Exception { +DescriptorProperties descriptor = new DescriptorProperties(); +final int rowsNumber = 200; +descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); +descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + +List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +assertThat(results).hasSize(rowsNumber); + +for (RowData row : results) { +assertThat(row.getString(0).toString()).hasSize(50); +assertThat(row.getBinary(1)).hasSize(40); +} + +assertThatThrownBy( +() -> { +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS ++ ".f0." ++ DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS +
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
zhuzhurk commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1423468479 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -184,7 +184,7 @@ public class RestartStrategyOptions { public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER = ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier") .doubleType() -.defaultValue(2.0) +.defaultValue(1.2) Review Comment: 1.5 sounds good to me. Yet I think we need to send the updates to the FLIP discussion ML and open another vote for it, in case it turned out to be a surprise for those who have reviewed and voted on the FLIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
zhuzhurk commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1423468479 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -184,7 +184,7 @@ public class RestartStrategyOptions { public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER = ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier") .doubleType() -.defaultValue(2.0) +.defaultValue(1.2) Review Comment: 1.5 sounds good to me. Yet I think we need to send the updates to the FLIP discussion ML and open another vote for it. In case it turned out to be a surprise for those who have reviewed and voted on the FLIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]
xuyangzhong commented on code in PR #23886: URL: https://github.com/apache/flink/pull/23886#discussion_r1423461099 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateEventTimeRestoreTest.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecWindowAggregate}. */ +public class WindowAggregateEventTimeRestoreTest extends RestoreTestBase { + +public WindowAggregateEventTimeRestoreTest() { +super(StreamExecWindowAggregate.class); +} + +@Override +public List programs() { +return Arrays.asList( +WindowAggregateTestPrograms.GROUP_TUMBLE_WINDOW_EVENT_TIME, Review Comment: Just wonder why add prefix 'GROUP_' each test? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateTestPrograms.java: ## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowAggregate}. */ +public class WindowAggregateTestPrograms { + +static final Row[] BEFORE_DATA = { +Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", "a"), +Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:03", 2, 2d, 2f, new BigDecimal("2.22"), "Comment#1", "a"), +Row.of("2020-10-10 00:00:04", 5, 5d, 5f, new BigDecimal("5.55"), null, "a"), +Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"), +// out of order +Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", "b"), +Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), "Comment#2", "a"), +// late event +Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), "Hi", "a"), +Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", "b"), +Row.of("2020-10-10 00:00:32", 7, 7d, 7f, new BigDecimal("7.77"), null, null), +Row.of("2020-10-10 00:00:34", 1, 3d, 3f, new BigDecimal("3.33"), "Comment#3", "b") +}; + +static final Row[] AFTER_DATA = { +Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), "Comment#4", "a"), +Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), "Comment#5", "d"), +Row.of("2020-10-10 00:00:43", 12, 5d, 5f, new BigDecimal("6.44"), "Comment#6", "c"), +Row.of("2020-10-10 00:00:44", 13, 6d, 6f, new BigDecimal("7.44"), "Comment#7", "d") +}; + +static final SourceTestStep SOURCE = +
[jira] [Comment Edited] (FLINK-33780) Support to store default catalog in CatalogStore
[ https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574 ] Yubin Li edited comment on FLINK-33780 at 12/12/23 5:42 AM: [~hackergin] Hi, we could make modifitions in the `CatalogManager` construct function as follows: !image-2023-12-12-11-09-53-075.png|width=798,height=406! DefaultCatalog is the current catalog of a session including SqlGateway, `default_catalog` is like `d2` under the context `use catalog d2`, the only difference is that it is created implicitly by Flink, we should treat them equally for semantic consistency, WDYT? !image-2023-12-12-13-42-04-762.png|width=422,height=184! was (Author: liyubin117): [~hackergin] Hi, we could make modifitions in the `CatalogManager` construct function as follows: !image-2023-12-12-11-09-53-075.png|width=1165,height=593! DefaultCatalog is the current catalog of a session including SqlGateway, `default_catalog` is like `d2` under the context `use catalog d2`, the only difference is that it is created implicitly by Flink, we should treat them equally for semantic consistency, WDYT? > Support to store default catalog in CatalogStore > > > Key: FLINK-33780 > URL: https://issues.apache.org/jira/browse/FLINK-33780 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2023-12-11-13-47-29-623.png, > image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png, > image-2023-12-12-13-42-04-762.png > > > Flink initially creates a default catalog which is included in the > `Map catalogs`, but is not stored in the CatalogStore. > After conducting thorough investigation, I've determined that the necessary > modification can be made within the `CatalogManager`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] only close the cluster once to make the tests faster [flink]
flinkbot commented on PR #23910: URL: https://github.com/apache/flink/pull/23910#issuecomment-1851322851 ## CI report: * b0fd64a2a24dc1cd50ea2fb5ae4f3d4036e2b85a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] only close the cluster once to make the tests faster [flink]
TestBoost opened a new pull request, #23910: URL: https://github.com/apache/flink/pull/23910 ## What is the purpose of the change This change is to make `miniCluster.close()` run only once after all tests finish and make the tests in test class `ClientHeartbeatTest` run faster. There are three tests in this test class. They are trying to submit different jobs through the cluster and assert the running status of `miniCluster` or the job status of submitted job. These tests are submitting jobs that are not related to each other. We also try to run these tests in different orders and they always pass. Besides, test class `ClientHeartbeatTest` is similar to the test class `org.apache.flink.runtime.leaderelection.LeaderChangeClusterComponentsTest`. However, test class `LeaderChangeClusterComponentsTest` only close the `miniCluster` after all tests finish. It is a good choice to apply similar approach to the test class `ClientHeartbeatTest`. The test runtime can change from `9.9113 s` to `8.9771 s` after applying this change. ## Brief change log - This pull request changes the code such that `miniCluster.close()` run once after all tests finish rather than run after every test. ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. This pull request is just modifying tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
1996fanrui commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1423360642 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -184,7 +184,7 @@ public class RestartStrategyOptions { public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER = ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier") .doubleType() -.defaultValue(2.0) +.defaultValue(1.2) Review Comment: Thanks @mxm for the feedback! `1.5` is fine for me, and I'd like to cc @zhuzhurk who propose change these 2 default value as well! Max and Mason have a little feedback after voting in the [user mail list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and Max and I had a offline discussion yesterday. Max think the 1.2 is a little small or aggressive(delay time is too short). Here is the reason: - Every time the job restarts, it will make a bunch of calls to the Kubernetes API, e.g. read/write to config maps, create task managers. - When his producation had the default fixed-delay(1s) restart strategy turned on. A Kubernetes cluster became instable. Following is the relationship between restart-attempts and retry-delay-time: - The `delay-time` will reach 1 min after 12 attempts when `backoff-multiplier` is 1.5 - The `delay-time` will reach 1 min after 24 attempts when `backoff-multiplier` is 1.2 Hey @zhuzhurk , what do you think about setting`1.5` as the default value? If you agree it, I will update the FLIP and feedback it to the [user mail list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and go ahead this PR. ![image](https://github.com/apache/flink/assets/38427477/642c57e0-b415-4326-af05-8b506c5fbb3a) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]
liyubin117 commented on code in PR #23678: URL: https://github.com/apache/flink/pull/23678#discussion_r1423393030 ## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ## @@ -297,10 +295,104 @@ void testVariableLengthDataGeneration() throws Exception { anyCauseMatches( ValidationException.class, String.format( -"Only supports specifying '%s' option for variable-length types (varchar, string, varbinary, bytes). The type of field %s is not within this range.", +"Only supports specifying '%s' option for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field '%s' is not within this range.", DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4"))); } +@Test +void testVariableLengthDataType() throws Exception { +DescriptorProperties descriptor = new DescriptorProperties(); +final int rowsNumber = 200; +descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); +descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + +List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +assertThat(results).hasSize(rowsNumber); + +for (RowData row : results) { +assertThat(row.getString(2).toString()).hasSize(30); +assertThat(row.getBinary(3)).hasSize(20); +assertThat(row.getString(4).toString()) + .hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT); +} + +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.LENGTH, +25); +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH, +); + +results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); + +for (RowData row : results) { +assertThat(row.getString(2).toString()).hasSize(25); +assertThat(row.getString(4).toString()).hasSize(); +} + +assertThatThrownBy( +() -> { +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS ++ ".f3." ++ DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS ++ ".f3." ++ DataGenConnectorOptionsUtil.LENGTH, +21); + +runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +}) +.satisfies( +anyCauseMatches( +ValidationException.class, +"Custom length '21' for variable-length type (VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined in the schema.")); +} + +@Test +void testFixedLengthDataType() throws Exception { +DescriptorProperties descriptor = new DescriptorProperties(); +final int rowsNumber = 200; +descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); +descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + +List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +assertThat(results).hasSize(rowsNumber); + +for (RowData row : results) { +assertThat(row.getString(0).toString()).hasSize(50); +assertThat(row.getBinary(1)).hasSize(40); +} + +assertThatThrownBy( +() -> { +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS ++ ".f0." ++ DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS +
Re: [PR] [hotfix][build] Release branch should use SNAPSHOT version [flink-connector-pulsar]
leonardBang merged PR #66: URL: https://github.com/apache/flink-connector-pulsar/pull/66 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][build] Release branch should use SNAPSHOT version [flink-connector-pulsar]
leonardBang commented on PR #66: URL: https://github.com/apache/flink-connector-pulsar/pull/66#issuecomment-1851280119 CI passed, merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]
liyubin117 commented on code in PR #23678: URL: https://github.com/apache/flink/pull/23678#discussion_r1423385053 ## flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java: ## @@ -141,15 +143,15 @@ private DataGeneratorContainer createContainer( } private void validateFieldOptions(String name, DataType type, ReadableConfig options) { -ConfigOption lenOption = +ConfigOption varLenOption = key(DataGenConnectorOptionsUtil.FIELDS + "." + name + "." + DataGenConnectorOptionsUtil.VAR_LEN) .booleanType() .defaultValue(false); -options.getOptional(lenOption) +options.getOptional(varLenOption) Review Comment: the prior name is not clear enough, so I use a new one instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]
LadyForest commented on code in PR #23678: URL: https://github.com/apache/flink/pull/23678#discussion_r1423376727 ## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ## @@ -297,10 +295,104 @@ void testVariableLengthDataGeneration() throws Exception { anyCauseMatches( ValidationException.class, String.format( -"Only supports specifying '%s' option for variable-length types (varchar, string, varbinary, bytes). The type of field %s is not within this range.", +"Only supports specifying '%s' option for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field '%s' is not within this range.", DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4"))); } +@Test +void testVariableLengthDataType() throws Exception { +DescriptorProperties descriptor = new DescriptorProperties(); +final int rowsNumber = 200; +descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); +descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + +List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +assertThat(results).hasSize(rowsNumber); + +for (RowData row : results) { +assertThat(row.getString(2).toString()).hasSize(30); +assertThat(row.getBinary(3)).hasSize(20); +assertThat(row.getString(4).toString()) + .hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT); +} + +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.LENGTH, +25); +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH, +); + +results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); + +for (RowData row : results) { +assertThat(row.getString(2).toString()).hasSize(25); +assertThat(row.getString(4).toString()).hasSize(); +} + +assertThatThrownBy( +() -> { +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS ++ ".f3." ++ DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS ++ ".f3." ++ DataGenConnectorOptionsUtil.LENGTH, +21); + +runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +}) +.satisfies( +anyCauseMatches( +ValidationException.class, +"Custom length '21' for variable-length type (VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined in the schema.")); +} + +@Test +void testFixedLengthDataType() throws Exception { +DescriptorProperties descriptor = new DescriptorProperties(); +final int rowsNumber = 200; +descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); +descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + +List results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); +assertThat(results).hasSize(rowsNumber); + +for (RowData row : results) { +assertThat(row.getString(0).toString()).hasSize(50); +assertThat(row.getBinary(1)).hasSize(40); +} + +assertThatThrownBy( +() -> { +descriptor.putString( +DataGenConnectorOptionsUtil.FIELDS ++ ".f0." ++ DataGenConnectorOptionsUtil.KIND, +DataGenConnectorOptionsUtil.RANDOM); +descriptor.putLong( +DataGenConnectorOptionsUtil.FIELDS +
Re: [PR] [FLINK-33781][table] Cleanup usage of deprecated TableConfig#ctor [flink]
liuyongvs commented on PR #23897: URL: https://github.com/apache/flink/pull/23897#issuecomment-1851260132 > LGTM, I have a minor comment: do we need to remove the deprecated constructor method `public TableConfig() {}` now? i think we should not remove it now, may some users or connectors also use it . we can remove it at flink 2.0, what do you think? @lsyldliu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
1996fanrui commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1423367354 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -222,6 +223,19 @@ public class RestartStrategyOptions { code(RESTART_STRATEGY.key()), code("exponential-delay")) .build()); +@Documentation.OverrideDefault("infinite") +public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS = + ConfigOptions.key("restart-strategy.exponential-delay.attempts-before-reset-backoff") +.intType() +.defaultValue(Integer.MAX_VALUE) +.withDescription( +Description.builder() +.text( +"The number of times that Flink retries the execution before the job is declared as failed " ++ "before reset the backoff to its initial value if %s has been set to %s.", Review Comment: Thanks @RocMarshal for the review! Updated~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
reswqa commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1851249351 We should waiting for https://github.com/apache/flink/pull/23876 to be merged(This won't take long as it has already been approved by 3 votes). After that, CI should be able to pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33672] Use MapState.entries() instead of keys() and get() in over window [flink]
Zakelly commented on PR #23855: URL: https://github.com/apache/flink/pull/23855#issuecomment-1851243905 Thanks @fsk119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26586) FileSystem uses unbuffered read I/O
[ https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795577#comment-17795577 ] Hangxiang Yu commented on FLINK-26586: -- [~Matthias Schwalbe] Sure. Just feel free to contribute it. Already assigned to you, please go ahead. > FileSystem uses unbuffered read I/O > --- > > Key: FLINK-26586 > URL: https://issues.apache.org/jira/browse/FLINK-26586 > Project: Flink > Issue Type: Improvement > Components: API / State Processor, Connectors / FileSystem, Runtime > / Checkpointing >Affects Versions: 1.13.0, 1.14.0 >Reporter: Matthias Schwalbe >Assignee: Matthias Schwalbe >Priority: Major > Attachments: BufferedFSDataInputStreamWrapper.java, > BufferedLocalFileSystem.java > > > - I found out that, at least when using LocalFileSystem on a windows system, > read I/O to load a savepoint is unbuffered, > - See example stack [1] > - i.e. in order to load only a long in a serializer, it needs to go into > kernel mode 8 times and load the 8 bytes one by one > - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered > reads on any FileSystem implementation > - In our setting savepoint load is now 30 times faster > - I’ve once seen a Jira ticket as to improve savepoint load time in general > (lost the link unfortunately), maybe this approach can help with it > - not sure if HDFS has got the same problem > - I can contribute my implementation of a BufferedFSDataInputStreamWrapper > which can be integrated in any > [1] unbuffered reads stack: > read:207, FileInputStream (java.io) > read:68, LocalDataInputStream (org.apache.flink.core.fs.local) > read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs) > read:42, ForwardingInputStream (org.apache.flink.runtime.util) > readInt:390, DataInputStream (java.io) > deserialize:80, BytePrimitiveArraySerializer > (org.apache.flink.api.common.typeutils.base.array) > next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator > (org.apache.flink.runtime.state.restore) > next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator > (org.apache.flink.runtime.state.restore) > restoreKVStateData:147, RocksDBFullRestoreOperation > (org.apache.flink.contrib.streaming.state.restore) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-26586) FileSystem uses unbuffered read I/O
[ https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-26586: Assignee: Matthias Schwalbe > FileSystem uses unbuffered read I/O > --- > > Key: FLINK-26586 > URL: https://issues.apache.org/jira/browse/FLINK-26586 > Project: Flink > Issue Type: Improvement > Components: API / State Processor, Connectors / FileSystem, Runtime > / Checkpointing >Affects Versions: 1.13.0, 1.14.0 >Reporter: Matthias Schwalbe >Assignee: Matthias Schwalbe >Priority: Major > Attachments: BufferedFSDataInputStreamWrapper.java, > BufferedLocalFileSystem.java > > > - I found out that, at least when using LocalFileSystem on a windows system, > read I/O to load a savepoint is unbuffered, > - See example stack [1] > - i.e. in order to load only a long in a serializer, it needs to go into > kernel mode 8 times and load the 8 bytes one by one > - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered > reads on any FileSystem implementation > - In our setting savepoint load is now 30 times faster > - I’ve once seen a Jira ticket as to improve savepoint load time in general > (lost the link unfortunately), maybe this approach can help with it > - not sure if HDFS has got the same problem > - I can contribute my implementation of a BufferedFSDataInputStreamWrapper > which can be integrated in any > [1] unbuffered reads stack: > read:207, FileInputStream (java.io) > read:68, LocalDataInputStream (org.apache.flink.core.fs.local) > read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs) > read:42, ForwardingInputStream (org.apache.flink.runtime.util) > readInt:390, DataInputStream (java.io) > deserialize:80, BytePrimitiveArraySerializer > (org.apache.flink.api.common.typeutils.base.array) > next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator > (org.apache.flink.runtime.state.restore) > next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator > (org.apache.flink.runtime.state.restore) > restoreKVStateData:147, RocksDBFullRestoreOperation > (org.apache.flink.contrib.streaming.state.restore) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33798) Automatically clean up rocksdb logs when the task failover.
[ https://issues.apache.org/jira/browse/FLINK-33798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795576#comment-17795576 ] Hangxiang Yu commented on FLINK-33798: -- Thanks [~fanrui] pinging me here. I think you are right. The behavious after relocating is not consistent with before. We could make it. Thanks [~liming] for reporting this and we could go ahead. > Automatically clean up rocksdb logs when the task failover. > --- > > Key: FLINK-33798 > URL: https://issues.apache.org/jira/browse/FLINK-33798 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ming Li >Assignee: Ming Li >Priority: Major > > Since FLINK-24785 relocates rocksdb log, multiple rocksdb logs will be > created under the flink log directory, but they are not cleaned up during > task failover, resulting in a large number of rocksdb logs under the flink > log directory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
1996fanrui commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1423360642 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -184,7 +184,7 @@ public class RestartStrategyOptions { public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER = ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier") .doubleType() -.defaultValue(2.0) +.defaultValue(1.2) Review Comment: Thanks @mxm for the feedback! `1.5` is fine for me, and I'd like to cc @zhuzhurk who propose change these 2 default value as well! Max and Mason have a little feedback after voting in the [user mail list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and Max and I had a offline discussion yesterday. Max think the 1.2 is a little small or aggressive. Here is the reason: - Every time the job restarts, it will make a bunch of calls to the Kubernetes API, e.g. read/write to config maps, create task managers. - When his producation had the default fixed-delay(1s) restart strategy turned on. A Kubernetes cluster became instable. Following is the relationship between restart-attempts and retry-delay-time: - The `delay-time` will reach 1 min after 12 attempts when `backoff-multiplier` is 1.5 - The `delay-time` will reach 1 min after 24 attempts when `backoff-multiplier` is 1.2 Hey @zhuzhurk , what do you think about setting`1.5` as the default value? If you agree it, I will update the FLIP and feedback it to the [user mail list](https://lists.apache.org/thread/6glz0d57r8gtpzq4f71vf9066c5x6nyw), and go ahead this PR. ![image](https://github.com/apache/flink/assets/38427477/642c57e0-b415-4326-af05-8b506c5fbb3a) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33780) Support to store default catalog in CatalogStore
[ https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574 ] Yubin Li edited comment on FLINK-33780 at 12/12/23 3:22 AM: [~hackergin] Hi, we could make modifitions in the `CatalogManager` construct function as follows: !image-2023-12-12-11-09-53-075.png|width=1165,height=593! DefaultCatalog is the current catalog of a session including SqlGateway, `default_catalog` is like `d2` under the context `use catalog d2`, the only difference is that it is created implicitly by Flink, we should treat them equally for semantic consistency, WDYT? was (Author: liyubin117): [~hackergin] Hi, we could make modifitions in the `CatalogManager` construct function as follows: !image-2023-12-12-11-09-53-075.png|width=1165,height=593! DefaultCatalog is the current catalog of a session including SqlGateway, `default_catalog` is like `d2` under the context `use catalog d2`, only the difference is that it is created implicitly by Flink, we should treat them equally for semantic consistency, WDYT? > Support to store default catalog in CatalogStore > > > Key: FLINK-33780 > URL: https://issues.apache.org/jira/browse/FLINK-33780 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2023-12-11-13-47-29-623.png, > image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png > > > Flink initially creates a default catalog which is included in the > `Map catalogs`, but is not stored in the CatalogStore. > After conducting thorough investigation, I've determined that the necessary > modification can be made within the `CatalogManager`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33780) Support to store default catalog in CatalogStore
[ https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574 ] Yubin Li edited comment on FLINK-33780 at 12/12/23 3:21 AM: [~hackergin] Hi, we could make modifitions in the `CatalogManager` construct function as follows: !image-2023-12-12-11-09-53-075.png|width=1165,height=593! DefaultCatalog is the current catalog of a session including SqlGateway, `default_catalog` is like `d2` under the context `use catalog d2`, only the difference is that it is created implicitly by Flink, we should treat them equally for semantic consistency, WDYT? was (Author: liyubin117): [~hackergin] Hi, we could make modifitions in the `CatalogManager` construct function as follows: !image-2023-12-12-11-09-53-075.png|width=1165,height=593! DefaultCatalog is the current catalog of a session including SqlGateway, `default_catalog` is like `d2`, only the difference is that it is created implicitly by Flink, we should treat them equally for semantic consistency, WDYT? !image-2023-12-11-14-14-10-002.png|width=805,height=565! > Support to store default catalog in CatalogStore > > > Key: FLINK-33780 > URL: https://issues.apache.org/jira/browse/FLINK-33780 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2023-12-11-13-47-29-623.png, > image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png > > > Flink initially creates a default catalog which is included in the > `Map catalogs`, but is not stored in the CatalogStore. > After conducting thorough investigation, I've determined that the necessary > modification can be made within the `CatalogManager`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33780) Support to store default catalog in CatalogStore
[ https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795574#comment-17795574 ] Yubin Li commented on FLINK-33780: -- [~hackergin] Hi, we could make modifitions in the `CatalogManager` construct function as follows: !image-2023-12-12-11-09-53-075.png|width=1165,height=593! DefaultCatalog is the current catalog of a session including SqlGateway, `default_catalog` is like `d2`, only the difference is that it is created implicitly by Flink, we should treat them equally for semantic consistency, WDYT? !image-2023-12-11-14-14-10-002.png|width=805,height=565! > Support to store default catalog in CatalogStore > > > Key: FLINK-33780 > URL: https://issues.apache.org/jira/browse/FLINK-33780 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2023-12-11-13-47-29-623.png, > image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png > > > Flink initially creates a default catalog which is included in the > `Map catalogs`, but is not stored in the CatalogStore. > After conducting thorough investigation, I've determined that the necessary > modification can be made within the `CatalogManager`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33716) Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView
[ https://issues.apache.org/jira/browse/FLINK-33716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan reassigned FLINK-33716: - Assignee: Jacky Lau > Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView > -- > > Key: FLINK-33716 > URL: https://issues.apache.org/jira/browse/FLINK-33716 > Project: Flink > Issue Type: Sub-task >Reporter: Jane Chan >Assignee: Jacky Lau >Priority: Major > > {code:java} > ExpressionTestBase > HiveTableSinkITCase > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33716) Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView
[ https://issues.apache.org/jira/browse/FLINK-33716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795573#comment-17795573 ] Jane Chan commented on FLINK-33716: --- [~jackylau] assigned to you. > Cleanup the usage of deprecated StreamTableEnvironment#createTemporaryView > -- > > Key: FLINK-33716 > URL: https://issues.apache.org/jira/browse/FLINK-33716 > Project: Flink > Issue Type: Sub-task >Reporter: Jane Chan >Assignee: Jacky Lau >Priority: Major > > {code:java} > ExpressionTestBase > HiveTableSinkITCase > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33717) Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream
[ https://issues.apache.org/jira/browse/FLINK-33717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan reassigned FLINK-33717: - Assignee: Jacky Lau > Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream > - > > Key: FLINK-33717 > URL: https://issues.apache.org/jira/browse/FLINK-33717 > Project: Flink > Issue Type: Sub-task >Reporter: Jane Chan >Assignee: Jacky Lau >Priority: Major > > {code:java} > PythonScalarFunctionOperatorTestBase > AvroTypesITCase {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33717) Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream
[ https://issues.apache.org/jira/browse/FLINK-33717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795572#comment-17795572 ] Jane Chan commented on FLINK-33717: --- Hi, [~jackylau]. Thanks for the interest, assigned to you. > Cleanup the usage of deprecated StreamTableEnvironment#fromDataStream > - > > Key: FLINK-33717 > URL: https://issues.apache.org/jira/browse/FLINK-33717 > Project: Flink > Issue Type: Sub-task >Reporter: Jane Chan >Priority: Major > > {code:java} > PythonScalarFunctionOperatorTestBase > AvroTypesITCase {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33780) Support to store default catalog in CatalogStore
[ https://issues.apache.org/jira/browse/FLINK-33780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-33780: - Attachment: image-2023-12-12-11-09-53-075.png > Support to store default catalog in CatalogStore > > > Key: FLINK-33780 > URL: https://issues.apache.org/jira/browse/FLINK-33780 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: Yubin Li >Priority: Major > Attachments: image-2023-12-11-13-47-29-623.png, > image-2023-12-11-14-14-10-002.png, image-2023-12-12-11-09-53-075.png > > > Flink initially creates a default catalog which is included in the > `Map catalogs`, but is not stored in the CatalogStore. > After conducting thorough investigation, I've determined that the necessary > modification can be made within the `CatalogManager`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423346604 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: > Also keep in mind that this is the average heap usage. With 90% average usage you are extremely likely to be close to out of heap in most cases. Thanks @gyfora for the clarification! I guess it's not average heap usage, and I wanna check with you first. In the `ScalingExecutor#isJobUnderMemoryPressure` method, we check whether `evaluatedMetrics.get(ScalingMetric.HEAP_USAGE).getAverage()` > `conf.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)`. Intuitively `getAverage` looks like the average, but its calculation is divided into two steps: - Step1: `ScalingMetrics#computeGlobalMetrics` collect the `HEAP_USAGE` for each time, it's `heapUsed.getMax() / heapMax.getMax()`. - IIUC, the `heapUsed` is `AggregatedMetric`, when one job has 1000 taskmanagers, if the heapUsed for 999 tms is very low, and only one tm is high, we think `heapUsed` is high as this time. - Step2: `ScalingMetricEvaluator#evaluateGlobalMetrics` compute the `HEAP_USAGE` based on `metricHistory`. - The `metricHistory` is composed of TMs with the highest heapUsage at a large number of time points. Strictly speaking, both of 2 steps have some problems: - Step1: Java GC is executed lazily, not immediately. - When TM heapUsage is high, it may be that the GC has not been triggered, which does not mean that the memory pressure is high. - Especially if the heapUsage is high for only one TM or a small number of TMs. - Step2: Since the data in the first step is unreliable, the average value in the second step is unreliable. > GC metrics will only be available in Flink 1.19. I'm not sure can we sum all GC times as the total gc times? Before 1.19, it has detailed GC times for each GC. > This is a very good point and happens often. I think we could definitely build this logic on top of the newly introduced metrics + scaling history as a follow up. It would probably be a very good addition. (but definitely out of scope for this PR) Sounds make sense, as I understand: it's better to revert this scaling if job is unhealthy after scale down. The memory pressure is one type of unhealthy. Checkpoint fails or CPU pressure may be unhealthy as well. Would you mind if I create one JIRA and pick it up? Thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795571#comment-17795571 ] xiaogang zhou commented on FLINK-33728: --- Hi [~mapohl] , thanks for the comment above. sorry for my poor writing english :P, but I think your re-clarification is exactly what I am proposing. I'd like to introduce a lazy re-initialization of watch mechanism which will tolerate a disconnection of the watch until a new POD is requested. And I think your concern is how we detect a TM loss without a active watcher. I have test my change in a real K8S environment. With a disconnected watcher, I killed a TM pod. after no more than 50s, the task restarted with a exception {code:java} // code placeholder java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id flink-6168d34cf9d3a5d31ad8bb02bce6a370-taskmanager-1-8 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1306) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitC {code} moreover, I think YARN also do not have a watcher mechanism, so FLINK scheduled in yarn also relays on a heartbeat timeout mechanism? And an active rewatching strategy can really cause great pressure on API server, especially in the early versions without the resource version zero set in the watch-list request. > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][test] Migrate JsonRowDeserializationSchemaTest/JsonRowSerializationSchemaTest to Junit5 and Assertj [flink]
fsk119 merged PR #23882: URL: https://github.com/apache/flink/pull/23882 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33672) Use MapState.entries() instead of keys() and get() in over window
[ https://issues.apache.org/jira/browse/FLINK-33672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang closed FLINK-33672. - Fix Version/s: 1.19.0 Resolution: Fixed > Use MapState.entries() instead of keys() and get() in over window > - > > Key: FLINK-33672 > URL: https://issues.apache.org/jira/browse/FLINK-33672 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > In code logic related with over windows, such as > org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction > {code:java} > private transient MapState> inputState; > public void onTimer( > long timestamp, > KeyedProcessFunction.OnTimerContext ctx, > Collector out) > throws Exception { > //... > Iterator iter = inputState.keys().iterator(); > //... > while (iter.hasNext()) { > Long elementKey = iter.next(); > if (elementKey < limit) { > // element key outside of window. Retract values > List elementsRemove = inputState.get(elementKey); > // ... > } > } > //... > } {code} > As we can see, there is a combination of key iteration and get the value for > iterated key from inputState. However for RocksDB, the key iteration calls > entry iteration, which means actually we could replace it by entry iteration > without introducing any extra overhead. And as a result, we could save a > function call of get() by using getValue() of iterated entry at very low cost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33672) Use MapState.entries() instead of keys() and get() in over window
[ https://issues.apache.org/jira/browse/FLINK-33672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795569#comment-17795569 ] Shengkai Fang commented on FLINK-33672: --- Merged into master: 080119cca53d9890257982b6a74a7d6f913253c2 > Use MapState.entries() instead of keys() and get() in over window > - > > Key: FLINK-33672 > URL: https://issues.apache.org/jira/browse/FLINK-33672 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > > In code logic related with over windows, such as > org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction > {code:java} > private transient MapState> inputState; > public void onTimer( > long timestamp, > KeyedProcessFunction.OnTimerContext ctx, > Collector out) > throws Exception { > //... > Iterator iter = inputState.keys().iterator(); > //... > while (iter.hasNext()) { > Long elementKey = iter.next(); > if (elementKey < limit) { > // element key outside of window. Retract values > List elementsRemove = inputState.get(elementKey); > // ... > } > } > //... > } {code} > As we can see, there is a combination of key iteration and get the value for > iterated key from inputState. However for RocksDB, the key iteration calls > entry iteration, which means actually we could replace it by entry iteration > without introducing any extra overhead. And as a result, we could save a > function call of get() by using getValue() of iterated entry at very low cost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33672] Use MapState.entries() instead of keys() and get() in over window [flink]
fsk119 merged PR #23855: URL: https://github.com/apache/flink/pull/23855 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]
TanYuxin-tyx commented on PR #23771: URL: https://github.com/apache/flink/pull/23771#issuecomment-1851182058 @lsyldliu Thanks for helping review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33612) The table plan of hybrid shuffle may introduce additional blocking edges occasionally
[ https://issues.apache.org/jira/browse/FLINK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795564#comment-17795564 ] dalongliu commented on FLINK-33612: --- merged via master branch: 9cac80be18c6aff0cebdfe706327c1693822e884 > The table plan of hybrid shuffle may introduce additional blocking edges > occasionally > - > > Key: FLINK-33612 > URL: https://issues.apache.org/jira/browse/FLINK-33612 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > > To enhance the performance of hybrid shuffle, it is imperative to address the > inconsistency between hybrid shuffle mode and blocking shuffle mode in > certain query plans of TPC-DS (such as q88.sql, q14a.sql, q14b.sql, etc). > In hybrid shuffle mode, these plans introduce additional blocking shuffle > edges and result in increased shuffle times, potentially impacting overall > efficiency. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]
lsyldliu merged PR #23771: URL: https://github.com/apache/flink/pull/23771 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump org.apache.commons:commons-compress from 1.22 to 1.24.0 [flink-connector-pulsar]
tisonkun merged PR #65: URL: https://github.com/apache/flink-connector-pulsar/pull/65 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][build] Bump version to 4.2-SNAPSHOT [flink-connector-pulsar]
tisonkun merged PR #67: URL: https://github.com/apache/flink-connector-pulsar/pull/67 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][build] Bump version to 4.2-SNAPSHOT [flink-connector-pulsar]
boring-cyborg[bot] commented on PR #67: URL: https://github.com/apache/flink-connector-pulsar/pull/67#issuecomment-1851152475 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [build] Bump version to 4.2-SNAPSHOT [flink-connector-pulsar]
tisonkun commented on code in PR #67: URL: https://github.com/apache/flink-connector-pulsar/pull/67#discussion_r1423296836 ## flink-connector-pulsar-e2e-tests/pom.xml: ## @@ -23,7 +23,7 @@ under the License. org.apache.flink flink-connector-pulsar-parent - 4.1-SNAPSHOT + 4.2-SNAPSHOT Review Comment: @snuyanzin This may need a mailing list discussion. I personally support to use `4.2.0-SNAPSHOT` but the whole Flink repos use `X.Y-SNAPSHOT` for a long time and I don't know if other scripts depend on this format. cc @zentol For this patch, I tend to keep the current flavor. ## docs/data/pulsar.yml: ## @@ -16,7 +16,7 @@ # limitations under the License. -version: 4.1.0 +version: 4.2-SNAPSHOT Review Comment: ```suggestion version: 4.1.10 ``` ## docs/data/pulsar.yml: ## @@ -16,7 +16,7 @@ # limitations under the License. -version: 4.1.0 +version: 4.2-SNAPSHOT Review Comment: ```suggestion version: 4.1.0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][build] Release branch should use SNAPSHOT version [flink-connector-pulsar]
tisonkun commented on code in PR #66: URL: https://github.com/apache/flink-connector-pulsar/pull/66#discussion_r1423296026 ## docs/data/pulsar.yml: ## @@ -16,7 +16,7 @@ # limitations under the License. -version: 4.1.0 +version: 4.1-SNAPSHOT Review Comment: ```suggestion version: 4.1.0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [build][hotfix] Release branch should use SNAPSHOT version [flink-connector-pulsar]
tisonkun commented on code in PR #66: URL: https://github.com/apache/flink-connector-pulsar/pull/66#discussion_r1423293161 ## docs/data/pulsar.yml: ## @@ -16,7 +16,7 @@ # limitations under the License. -version: 4.1.0 +version: 4.1-SNAPSHOT Review Comment: Ditto https://github.com/apache/flink-connector-pulsar/pull/67#discussion_r1422300397 Please keep it as 4.1.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] config key for parquet int64 option [flink]
flinkbot commented on PR #23909: URL: https://github.com/apache/flink/pull/23909#issuecomment-1851138706 ## CI report: * b72dd38e4c835667830025114edd647c40224429 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][docs] config key for parquet int64 option [flink]
tweise opened a new pull request, #23909: URL: https://github.com/apache/flink/pull/23909 Corrects error from https://github.com/apache/flink/pull/23900 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin closed FLINK-33361. --- > Add Java 17 compatibility to Flink Kafka connector > -- > > Key: FLINK-33361 > URL: https://issues.apache.org/jira/browse/FLINK-33361 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.0.1, kafka-3.1.0 >Reporter: Martijn Visser >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.1.0 > > > When currently trying to {{mvn clean install -Dflink.version=1.18.0 > -Dscala-2.12 -Prun-end-to-end-tests > -DdistDir=/Users/mvisser/Developer/flink-1.18.0 > -Dflink.convergence.phase=install > -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors > like: > {code:java} > [INFO] > [INFO] Results: > [INFO] > [ERROR] Errors: > [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore > [ERROR] Run 1: Exception while creating StreamOperatorStateContext. > [ERROR] Run 2: Exception while creating StreamOperatorStateContext. > [ERROR] Run 3: Exception while creating StreamOperatorStateContext. > [ERROR] Run 4: Exception while creating StreamOperatorStateContext. > [ERROR] Run 5: Exception while creating StreamOperatorStateContext. > [ERROR] Run 6: Exception while creating StreamOperatorStateContext. > [ERROR] Run 7: Exception while creating StreamOperatorStateContext. > [ERROR] Run 8: Exception while creating StreamOperatorStateContext. > [ERROR] Run 9: Exception while creating StreamOperatorStateContext. > [INFO] > [ERROR] > FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » > Runtime > [ERROR] FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » > Checkpoint C... > [ERROR] FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » > Checkpoint Cou... > [ERROR] UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » > UncheckedIO jav... > {code} > Example stacktrace: > {code:java} > Test > testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest) > failed with: > java.io.UncheckedIOException: java.io.IOException: Serializing the source > elements failed: java.lang.reflect.InaccessibleObjectException: Unable to > make field private final java.lang.Object[] java.util.Arrays$ArrayList.a > accessible: module java.base does not "opens java.util" to unnamed module > @45b4c3a9 > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146) > at > org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402) > at > org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40) > at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328) >
[jira] [Commented] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795516#comment-17795516 ] Sergey Nuyanzin commented on FLINK-33361: - Merged as [825052f55754e401176083c121ffaf38362b7a26|https://github.com/apache/flink-connector-kafka/commit/825052f55754e401176083c121ffaf38362b7a26] > Add Java 17 compatibility to Flink Kafka connector > -- > > Key: FLINK-33361 > URL: https://issues.apache.org/jira/browse/FLINK-33361 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.0.1, kafka-3.1.0 >Reporter: Martijn Visser >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.1.0 > > > When currently trying to {{mvn clean install -Dflink.version=1.18.0 > -Dscala-2.12 -Prun-end-to-end-tests > -DdistDir=/Users/mvisser/Developer/flink-1.18.0 > -Dflink.convergence.phase=install > -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors > like: > {code:java} > [INFO] > [INFO] Results: > [INFO] > [ERROR] Errors: > [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore > [ERROR] Run 1: Exception while creating StreamOperatorStateContext. > [ERROR] Run 2: Exception while creating StreamOperatorStateContext. > [ERROR] Run 3: Exception while creating StreamOperatorStateContext. > [ERROR] Run 4: Exception while creating StreamOperatorStateContext. > [ERROR] Run 5: Exception while creating StreamOperatorStateContext. > [ERROR] Run 6: Exception while creating StreamOperatorStateContext. > [ERROR] Run 7: Exception while creating StreamOperatorStateContext. > [ERROR] Run 8: Exception while creating StreamOperatorStateContext. > [ERROR] Run 9: Exception while creating StreamOperatorStateContext. > [INFO] > [ERROR] > FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » > Runtime > [ERROR] FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » > Checkpoint C... > [ERROR] FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » > Checkpoint Cou... > [ERROR] UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » > UncheckedIO jav... > {code} > Example stacktrace: > {code:java} > Test > testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest) > failed with: > java.io.UncheckedIOException: java.io.IOException: Serializing the source > elements failed: java.lang.reflect.InaccessibleObjectException: Unable to > make field private final java.lang.Object[] java.util.Arrays$ArrayList.a > accessible: module java.base does not "opens java.util" to unnamed module > @45b4c3a9 > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146) > at > org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402) > at > org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40) > at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839) > at >
[jira] [Assigned] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-33361: --- Assignee: Sergey Nuyanzin > Add Java 17 compatibility to Flink Kafka connector > -- > > Key: FLINK-33361 > URL: https://issues.apache.org/jira/browse/FLINK-33361 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.0.1, kafka-3.1.0 >Reporter: Martijn Visser >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > When currently trying to {{mvn clean install -Dflink.version=1.18.0 > -Dscala-2.12 -Prun-end-to-end-tests > -DdistDir=/Users/mvisser/Developer/flink-1.18.0 > -Dflink.convergence.phase=install > -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors > like: > {code:java} > [INFO] > [INFO] Results: > [INFO] > [ERROR] Errors: > [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore > [ERROR] Run 1: Exception while creating StreamOperatorStateContext. > [ERROR] Run 2: Exception while creating StreamOperatorStateContext. > [ERROR] Run 3: Exception while creating StreamOperatorStateContext. > [ERROR] Run 4: Exception while creating StreamOperatorStateContext. > [ERROR] Run 5: Exception while creating StreamOperatorStateContext. > [ERROR] Run 6: Exception while creating StreamOperatorStateContext. > [ERROR] Run 7: Exception while creating StreamOperatorStateContext. > [ERROR] Run 8: Exception while creating StreamOperatorStateContext. > [ERROR] Run 9: Exception while creating StreamOperatorStateContext. > [INFO] > [ERROR] > FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » > Runtime > [ERROR] FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » > Checkpoint C... > [ERROR] FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » > Checkpoint Cou... > [ERROR] UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » > UncheckedIO jav... > {code} > Example stacktrace: > {code:java} > Test > testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest) > failed with: > java.io.UncheckedIOException: java.io.IOException: Serializing the source > elements failed: java.lang.reflect.InaccessibleObjectException: Unable to > make field private final java.lang.Object[] java.util.Arrays$ArrayList.a > accessible: module java.base does not "opens java.util" to unnamed module > @45b4c3a9 > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146) > at > org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402) > at > org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40) > at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328) >
[jira] [Resolved] (FLINK-33361) Add Java 17 compatibility to Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-33361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-33361. - Fix Version/s: kafka-3.1.0 Resolution: Fixed > Add Java 17 compatibility to Flink Kafka connector > -- > > Key: FLINK-33361 > URL: https://issues.apache.org/jira/browse/FLINK-33361 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.0.1, kafka-3.1.0 >Reporter: Martijn Visser >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.1.0 > > > When currently trying to {{mvn clean install -Dflink.version=1.18.0 > -Dscala-2.12 -Prun-end-to-end-tests > -DdistDir=/Users/mvisser/Developer/flink-1.18.0 > -Dflink.convergence.phase=install > -Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors > like: > {code:java} > [INFO] > [INFO] Results: > [INFO] > [ERROR] Errors: > [ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore > [ERROR] Run 1: Exception while creating StreamOperatorStateContext. > [ERROR] Run 2: Exception while creating StreamOperatorStateContext. > [ERROR] Run 3: Exception while creating StreamOperatorStateContext. > [ERROR] Run 4: Exception while creating StreamOperatorStateContext. > [ERROR] Run 5: Exception while creating StreamOperatorStateContext. > [ERROR] Run 6: Exception while creating StreamOperatorStateContext. > [ERROR] Run 7: Exception while creating StreamOperatorStateContext. > [ERROR] Run 8: Exception while creating StreamOperatorStateContext. > [ERROR] Run 9: Exception while creating StreamOperatorStateContext. > [INFO] > [ERROR] > FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » > Runtime > [ERROR] FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » > Checkpoint C... > [ERROR] FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » > Checkpoint Cou... > [ERROR] UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » > UncheckedIO jav... > {code} > Example stacktrace: > {code:java} > Test > testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest) > failed with: > java.io.UncheckedIOException: java.io.IOException: Serializing the source > elements failed: java.lang.reflect.InaccessibleObjectException: Unable to > make field private final java.lang.Object[] java.util.Arrays$ArrayList.a > accessible: module java.base does not "opens java.util" to unnamed module > @45b4c3a9 > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146) > at > org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402) > at > org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40) > at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590) > at >
Re: [PR] [FLINK-33361][Connectors/Kafka] Add Java 17 compatibility to Flink Kafka connector [flink-connector-kafka]
boring-cyborg[bot] commented on PR #68: URL: https://github.com/apache/flink-connector-kafka/pull/68#issuecomment-1850970248 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33361][Connectors/Kafka] Add Java 17 compatibility to Flink Kafka connector [flink-connector-kafka]
snuyanzin merged PR #68: URL: https://github.com/apache/flink-connector-kafka/pull/68 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33361][Connectors/Kafka] Add Java 17 compatibility to Flink Kafka connector [flink-connector-kafka]
snuyanzin commented on code in PR #68: URL: https://github.com/apache/flink-connector-kafka/pull/68#discussion_r1423178960 ## pom.xml: ## @@ -78,6 +78,13 @@ under the License. 2.17.1 flink-connector-kafka-parent + + + +-XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${flink.connector.module.config} Review Comment: IIRC flags are configured in `flink-conf.yaml` at `env.java.opts.all` and all flags used here are already present there. That means that currently there is nothing to do. In theory in future there might appear such necessity e.g. because of some specific dependencies however I think it should be handled only once it is required -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33800) Allow passing parameters to database via jdbc url
[ https://issues.apache.org/jira/browse/FLINK-33800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33800: --- Labels: pull-request-available (was: ) > Allow passing parameters to database via jdbc url > - > > Key: FLINK-33800 > URL: https://issues.apache.org/jira/browse/FLINK-33800 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: jdbc-3.1.1 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > Currently it does not allow to pass extra properties e.g. > an attempt to connect to > {{jdbc:postgresql://...?sslmode=require}} > fails with > {noformat} > Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: > Failed to fetchResults. > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229) > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83) > ... 48 more > Caused by: > org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to > execute the operation b70b5cf7-7068-4eb6-83a4-78aed36dbd35. > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > {noformat} > because of of a logic at > {{org.apache.flink.connector.jdbc.catalog.JdbcCatalogUtils#validateJdbcUrl}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33800][JDBC/Connector] Allow passing parameters to database via jdbc url [flink-connector-jdbc]
snuyanzin opened a new pull request, #83: URL: https://github.com/apache/flink-connector-jdbc/pull/83 The idea is pretty straightforward: from one side there is a jdbc url which is normally is provided by most of the dbs providers to connect to. From the other side for some, probably historical reasons, there a separate property for default database which is used for default url. To not break former behaviour it is possible to specify either database in jdbcurl or database with extra parameters only if database is same as ifor a dedicated property for default database. In fact I tend to think to mark it as a deprecated since all the info could be extracted from jdbc url -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]
ryanvanhuuksloot commented on code in PR #727: URL: https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423102408 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java: ## @@ -93,24 +96,45 @@ private static HasMetadata getIngress( if (ingressInNetworkingV1(client)) { return new IngressBuilder() .withNewMetadata() +.withLabels(spec.getIngress().getLabels()) .withAnnotations(spec.getIngress().getAnnotations()) .withName(objectMeta.getName()) .withNamespace(objectMeta.getNamespace()) .endMetadata() .withNewSpec() .withIngressClassName(spec.getIngress().getClassName()) +.withTls(spec.getIngress().getTls()) .withRules(getIngressRule(objectMeta, spec, effectiveConfig)) .endSpec() .build(); } else { +List ingressTLS = Review Comment: Pulled it outside of the return because otherwise it was even more impossible to read -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]
ryanvanhuuksloot commented on code in PR #727: URL: https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423103105 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressTlsSpec.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.spec; + +import org.apache.flink.annotation.Experimental; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** Ingress spec. */ +@Experimental +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class IngressTlsSpec { Review Comment: No particular reason - didn't think about it. Swapped to the kubernetes spec. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]
ryanvanhuuksloot commented on code in PR #727: URL: https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423102408 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java: ## @@ -93,24 +96,45 @@ private static HasMetadata getIngress( if (ingressInNetworkingV1(client)) { return new IngressBuilder() .withNewMetadata() +.withLabels(spec.getIngress().getLabels()) .withAnnotations(spec.getIngress().getAnnotations()) .withName(objectMeta.getName()) .withNamespace(objectMeta.getNamespace()) .endMetadata() .withNewSpec() .withIngressClassName(spec.getIngress().getClassName()) +.withTls(spec.getIngress().getTls()) .withRules(getIngressRule(objectMeta, spec, effectiveConfig)) .endSpec() .build(); } else { +List ingressTLS = Review Comment: Pulled it outside of the return because otherwise it is even more impossible to read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]
ryanvanhuuksloot commented on code in PR #727: URL: https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1423102109 ## docs/content/docs/custom-resource/reference.md: ## @@ -96,6 +96,8 @@ This page serves as a full reference for FlinkDeployment custom resource definit | template | java.lang.String | Ingress template for the JobManager service. | | className | java.lang.String | Ingress className for the Flink deployment. | | annotations | java.util.Map | Ingress annotations. | +| labels | java.util.Map | Ingress labels. | +| tls | java.util.List | Ingress tls. | Review Comment: Like you said in your review - I think it is fine to only show v1 here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33800) Allow passing parameters to database via jdbc url
Sergey Nuyanzin created FLINK-33800: --- Summary: Allow passing parameters to database via jdbc url Key: FLINK-33800 URL: https://issues.apache.org/jira/browse/FLINK-33800 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: jdbc-3.1.1 Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin Currently it does not allow to pass extra properties e.g. an attempt to connect to {{jdbc:postgresql://...?sslmode=require}} fails with {noformat} Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to fetchResults. at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229) at org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83) ... 48 more Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation b70b5cf7-7068-4eb6-83a4-78aed36dbd35. at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) {noformat} because of of a logic at {{org.apache.flink.connector.jdbc.catalog.JdbcCatalogUtils#validateJdbcUrl}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061343 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -297,6 +301,28 @@ private void computeTargetDataRate( } } +@VisibleForTesting +protected static Map evaluateGlobalMetrics( +SortedMap metricHistory) { +var latest = metricHistory.get(metricHistory.lastKey()).getGlobalMetrics(); +var out = new HashMap(); + +var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN); +var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN); + +out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure)); +out.put( +HEAP_USAGE, +new EvaluatedScalingMetric( +lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, metricHistory))); +return out; +} + +private static double getAverageGlobalMetric( +ScalingMetric metric, SortedMap metricsHistory) { +return getAverage(metric, null, metricsHistory); +} + public static double getAverage( ScalingMetric metric, JobVertexID jobVertexId, Review Comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061010 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: 1. You are right but I still think the current logic is valuable because GC metrics will only be available in Flink 1.19. With the heap usage based logic we can also support older Flink versions. Also keep in mind that this is the average heap usage. With 90% average usage you are extremely likely to be close to out of heap in most cases. 2. This is a very good point and happens often. I think we could definitely build this logic on top of the newly introduced metrics + scaling history as a follow up. It would probably be a very good addition. (but definitely out of scope for this PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423057412 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +91,87 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +var metricNames = getTmMetricNames(restClient, ctx); +var metricNameMapping = new HashMap(); + +REQUIRED_TM_METRICS.forEach( +fm -> { +var name = +fm.findAny(metricNames) +.orElseThrow( +() -> +new RuntimeException( +"Could not find required TM metric " ++ fm.name())); +metricNameMapping.put(name, fm); +}); + +TOTAL_GC_TIME_PER_SEC +.findAny(metricNames) +.ifPresent( +m -> { +LOG.debug("GC metrics found"); +metricNameMapping.put(m, TOTAL_GC_TIME_PER_SEC); +}); + +var queriedMetrics = +new HashMap<>(queryAggregatedTmMetrics(restClient, metricNameMapping)); +availableTmMetricNames.put(ctx.getJobKey(), metricNames); Review Comment: Actually I just realised that TM metric names are fixed, so we know them beforehand and no need to cache it, we can simply hardcode it. I will work on this tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423056535 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -65,7 +65,16 @@ public enum ScalingMetric { SCALE_DOWN_RATE_THRESHOLD(false), /** Expected true processing rate after scale up. */ -EXPECTED_PROCESSING_RATE(false); +EXPECTED_PROCESSING_RATE(false), + +/** + * Maximum GC pressure across taskmanagers. Percentage of time spent garbage collecting between + * 0 (no time in GC) and 1 (100% time in GC). + */ +GC_PRESSURE(false), + +/** Percentage of max heap used (between 0 and 1). */ +HEAP_USAGE(true); Review Comment: good point, I will improve that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33795] Add new config to forbid autoscaling in certain periods of a day [flink-kubernetes-operator]
mxm commented on code in PR #728: URL: https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1422946292 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -72,6 +72,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Stabilization period in which no new scaling will be executed"); +public static final ConfigOption> FORBID_PERIOD = +autoScalerConfig("forbid.periods") +.stringType() +.asList() +.defaultValues() + .withDeprecatedKeys(deprecatedOperatorConfigKey("forbid.periods")) +.withDescription( +"A (semicolon-separated) list of certain times of the day during which autoscaling is forbidden, 10:00:00-11:00:00;21:30:00-22:30:00 for example"); Review Comment: This was also my first thought seeing this. Ideally, we would use some available standard for time spans. I think this also has to include the time zone. What about different times at different days of the week / month? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33373) Capture build scans on ge.apache.org to benefit from deep build insights
[ https://issues.apache.org/jira/browse/FLINK-33373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795475#comment-17795475 ] Clay Johnson commented on FLINK-33373: -- Hi [~mapohl], it was nice to connect with you on the ASF Infrastructure roundtable the other day. I see that we had some conversation on the ticket here, but let me know if there are any other questions you have about this! > Capture build scans on ge.apache.org to benefit from deep build insights > > > Key: FLINK-33373 > URL: https://issues.apache.org/jira/browse/FLINK-33373 > Project: Flink > Issue Type: Improvement > Components: Build System / CI >Reporter: Clay Johnson >Assignee: Clay Johnson >Priority: Minor > Labels: pull-request-available > > This improvement will enhance the functionality of the Flink build by > publishing build scans to [ge.apache.org|https://ge.apache.org/], hosted by > the Apache Software Foundation and run in partnership between the ASF and > Gradle. This Develocity instance has all features and extensions enabled and > is freely available for use by the Apache Flink project and all other Apache > projects. > On this Develocity instance, Apache Flink will have access not only to all of > the published build scans but other aggregate data features such as: > * Dashboards to view all historical build scans, along with performance > trends over time > * Build failure analytics for enhanced investigation and diagnosis of build > failures > * Test failure analytics to better understand trends and causes around slow, > failing, and flaky tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm opened a new pull request, #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729 The previous PR in #725 made the legacy autoscaler config keys "fallback" keys to prevent logging a deprecation WARN message on every reconciliation loop in the operator. Turns out, fallback keys also log a warning. This change moves to migrating all config keys in the legacy "kubernetes.operator." before any autoscaler ConfigOptions are used. This ensures no warnings will be logged but the older keys can still be used. The new keys always have precedence over the old keys. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33793) java.lang.NoSuchMethodError when checkpointing in Google Cloud Storage
[ https://issues.apache.org/jira/browse/FLINK-33793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795464#comment-17795464 ] Chris Nauroth commented on FLINK-33793: --- I expect this will be fixed by FLINK-33603. CC: [~jjayadeep] > java.lang.NoSuchMethodError when checkpointing in Google Cloud Storage > -- > > Key: FLINK-33793 > URL: https://issues.apache.org/jira/browse/FLINK-33793 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 > Environment: Flink 1.18 >Reporter: ChangZhuo Chen (陳昌倬) >Priority: Major > > We have the following exception when checkpointing in Flink 1.18 + Google > Cloud Storage. The same code works well in Flink 1.17: > > {{2023-12-11 07:45:28,861 ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'jobmanager-io-thread-5' produced an uncaught exception. Stopping the > process...}} > {{java.lang.NoSuchMethodError: 'com.google.common.collect.ImmutableMap > com.google.common.collect.ImmutableMap$Builder.buildOrThrow()'}} > {{ at > com.google.cloud.storage.UnifiedOpts$Opts.getRpcOptions(UnifiedOpts.java:2096) > ~[?:?]}} > {{ at > com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:624) ~[?:?]}} > {{ at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:90) > ~[?:?]}} > {{ at > org.apache.flink.fs.gs.storage.GSBlobStorageImpl.writeBlob(GSBlobStorageImpl.java:64) > ~[?:?]}} > {{ at > org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.createWriteChannel(GSRecoverableFsDataOutputStream.java:229) > ~[?:?]}} > {{ at > org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:152) > ~[?:?]}} > {{ at > org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:135) > ~[?:?]}} > {{ at > org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:128) > ~[?:?]}} > {{ at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:73) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at java.io.DataOutputStream.writeInt(Unknown Source) ~[?:?]}} > {{ at > org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:98) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:335) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1404) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1303) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1195) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > ~[flink-dist-1.18.0.jar:1.18.0]}} > {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) ~[?:?]}} > {{ at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) ~[?:?]}} > {{ at java.lang.Thread.run(Unknown Source) [?:?]}} > > The issue has been reported in GitHub > [https://github.com/apache/flink/pull/22281#issuecomment-1728553794.] > However, it is still not fixed yet in 1.18.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11. [flink]
cnauroth commented on PR #22281: URL: https://github.com/apache/flink/pull/22281#issuecomment-1850568860 > > There is now a patch available here at #23469. > > will it fix this issue? @yigress and @czchen , my apologies, I think I referenced the wrong pull request in my last comment. I think #23489 is the one that's relevant. That one is still open. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33799) Add e2e's for tls enabled operator
Tony Garrard created FLINK-33799: Summary: Add e2e's for tls enabled operator Key: FLINK-33799 URL: https://issues.apache.org/jira/browse/FLINK-33799 Project: Flink Issue Type: Technical Debt Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0 Reporter: Tony Garrard Fix For: kubernetes-operator-1.8.0 It would be good to create some E2E tests to ensure a tls enabled flink operator works, so that we don't break anything in the future -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments
[ https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795454#comment-17795454 ] Gyula Fora commented on FLINK-33633: In my personal opinion, creating the role without the binding doesn't really simplify anything it may just complicate the process because you need to know what role the operator is going to create exactly. > Automatic creation of RBAC for instances of Flink Deployments > - > > Key: FLINK-33633 > URL: https://issues.apache.org/jira/browse/FLINK-33633 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Tony Garrard >Priority: Not a Priority > > Currently users have to manually create RBAC e.g. the flink service account. > When operator is watching all namespaces; creation of a FlinkDeployment in a > specific namespace may fail if the kube admin has failed to create the > required RBAC. To improve usability the operator could be coded to > automatically create these rbac resources in the instance namespace if not > present -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33633) Automatic creation of RBAC for instances of Flink Deployments
[ https://issues.apache.org/jira/browse/FLINK-33633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795452#comment-17795452 ] Ryan van Huuksloot commented on FLINK-33633: Would there be any appetite to have the operator create just a ClusterRole for Flink? We can then leave the RoleBinding to the specific deployment to not scope creep. It at least removes one extra resource per deployment. I think this would be safe. > Automatic creation of RBAC for instances of Flink Deployments > - > > Key: FLINK-33633 > URL: https://issues.apache.org/jira/browse/FLINK-33633 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Tony Garrard >Priority: Not a Priority > > Currently users have to manually create RBAC e.g. the flink service account. > When operator is watching all namespaces; creation of a FlinkDeployment in a > specific namespace may fail if the kube admin has failed to create the > required RBAC. To improve usability the operator could be coded to > automatically create these rbac resources in the instance namespace if not > present -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795451#comment-17795451 ] Matthias Pohl commented on FLINK-33728: --- Thanks for creating this Jira issue, [~zhoujira86]. AFAIU, you're proposing the lazy initialization of the watcher after an connection error occurred that left the resourceVersion in an out-dated state (i.e. the resourceVersion which is used by the k8s client doesn't match any pod in the k8s cluster). Re-initialization of the watcher wouldn't happen when the error is detected but when Flink realizes that the TM is gone and initiates a new TM pod. Correct me if I'm wrong here but isn't the watcher watching multiple pods (all TM pods belonging to the Flink cluster) and the {{KubernetesTooOldResourceVersionException}} can be triggered by an error coming from a single pod? If that's the case, not re-initializing the watcher right away would leave us hanging for other pods' lifecycle events wouldn't it? We would lose the ability to detect the deletion of other pods. But I guess that's what you mean in your comment above with "delete pod can allow us detect pod failure more quickly, but we can also discover it by detecting the lost of akka heartbeat timeout."?! > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33796] Add ability to customize java version for python ci in connectors [flink-connector-shared-utils]
snuyanzin commented on PR #30: URL: https://github.com/apache/flink-connector-shared-utils/pull/30#issuecomment-1850502662 @pvary yep, sure here it is a link https://github.com/snuyanzin/flink-connector-kafka/actions/runs/7170817788 sorry I didn't put earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33697) FLIP-386: Support adding custom metrics in Recovery Spans
[ https://issues.apache.org/jira/browse/FLINK-33697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-33697: --- Description: h1. Motivation [FLIP-386|https://cwiki.apache.org/confluence/x/VAuZE] is building on top of [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]. The intention here is to add a capability for state backends to attach custom attributes during recovery to recovery spans. For example RocksDBIncrementalRestoreOperation could report both remote download time and time to actually clip/ingest the RocksDB instances after rescaling. was: h1. Motivation FLIP-386 is building on top of [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]. The intention here is to add a capability for state backends to attach custom attributes during recovery to recovery spans. For example RocksDBIncrementalRestoreOperation could report both remote download time and time to actually clip/ingest the RocksDB instances after rescaling. > FLIP-386: Support adding custom metrics in Recovery Spans > - > > Key: FLINK-33697 > URL: https://issues.apache.org/jira/browse/FLINK-33697 > Project: Flink > Issue Type: New Feature > Components: Runtime / Metrics, Runtime / State Backends >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.19.0 > > > h1. Motivation > [FLIP-386|https://cwiki.apache.org/confluence/x/VAuZE] is building on top of > [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]. > The intention here is to add a capability for state backends to attach > custom attributes during recovery to recovery spans. For example > RocksDBIncrementalRestoreOperation could report both remote download time and > time to actually clip/ingest the RocksDB instances after rescaling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33796] Add ability to customize java version for python ci in connectors [flink-connector-shared-utils]
pvary commented on PR #30: URL: https://github.com/apache/flink-connector-shared-utils/pull/30#issuecomment-1850450046 @snuyanzin: Do we have a PR where this job has been tried out, and working? For testing my changes I started using the changes on my own repo to verify that everything is fine. https://github.com/pvary/flink-connector-kafka/actions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33454] [Operator] Add IngressTlsSpec to support TLS within the managed Ingress, also add Label Passthrough [flink-kubernetes-operator]
gyfora commented on code in PR #727: URL: https://github.com/apache/flink-kubernetes-operator/pull/727#discussion_r1422763184 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/IngressTlsSpec.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.spec; + +import org.apache.flink.annotation.Experimental; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** Ingress spec. */ +@Experimental +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class IngressTlsSpec { Review Comment: Why don't we use ` io.fabric8.kubernetes.api.model.networking.v1.IngressTLS` here directly? We can still convert back to the beta -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33795] Add new config to forbid autoscaling in certain periods of a day [flink-kubernetes-operator]
gyfora commented on code in PR #728: URL: https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1422702163 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java: ## @@ -56,4 +60,30 @@ public void testVertexExclusion() { Set.of(v1.toString(), v2.toString(), v3.toString()), new HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS))); } + +@Test +public void testForbidPeriods() { Review Comment: Should be `Forbidden` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -96,6 +97,13 @@ public void scale(Context ctx) throws Exception { return; } +if (!AutoScalerUtils.verifyForbidPeriods(ctx.getConfiguration())) { Review Comment: Config validation should be in the validator module, not here. ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java: ## @@ -599,6 +602,73 @@ public void testMetricCollectionDuringStabilization() throws Exception { assertEquals(2, stateStore.getCollectedMetrics(context).size()); } +@Test +public void testMetricCollectionDuringForbidden() throws Exception { Review Comment: I think this test should be part of the `JobAutoscalerImplTest` and then it can be simplified a lot. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -72,6 +72,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Stabilization period in which no new scaling will be executed"); +public static final ConfigOption> FORBID_PERIOD = +autoScalerConfig("forbid.periods") +.stringType() +.asList() +.defaultValues() + .withDeprecatedKeys(deprecatedOperatorConfigKey("forbid.periods")) Review Comment: deprecated key should not be added ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -72,6 +72,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Stabilization period in which no new scaling will be executed"); +public static final ConfigOption> FORBID_PERIOD = +autoScalerConfig("forbid.periods") +.stringType() +.asList() +.defaultValues() + .withDeprecatedKeys(deprecatedOperatorConfigKey("forbid.periods")) +.withDescription( +"A (semicolon-separated) list of certain times of the day during which autoscaling is forbidden, 10:00:00-11:00:00;21:30:00-22:30:00 for example"); Review Comment: Where did you get this syntax from? What are the potential alternatives? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -114,6 +115,7 @@ public CollectedMetricHistory updateMetrics( var topology = getJobTopology(ctx, stateStore, jobDetailsInfo); var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); final boolean isStabilizing = now.isBefore(stableTime); +final boolean isForbidding = AutoScalerUtils.inForbidPeriod(conf, now); Review Comment: Everywhere the word `Forbid` should be replaced with `Forbidden` or `Blocked` to be correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31631][FileSystems] Upgrade GCS connector to 2.2.11. [flink]
czchen commented on PR #22281: URL: https://github.com/apache/flink/pull/22281#issuecomment-1850343759 @cnauroth The `java.lang.NoSuchMethodError` issue [remains in 1.18.0](https://issues.apache.org/jira/browse/FLINK-33793). Any change it can be fixed in 1.18 series? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26586) FileSystem uses unbuffered read I/O
[ https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795416#comment-17795416 ] Matthias Schwalbe commented on FLINK-26586: --- Hi [~masteryhx] I've reached agreement from my employer to contribute to this ticket, could you please assign it to me? I'll take a while due to my other work load. I'd also appreciate some support again especially with features not yet implemented: * general input I/O buffering for filesystems, configuration thereof * authoring tests * benchmarking Many thanks Thias > FileSystem uses unbuffered read I/O > --- > > Key: FLINK-26586 > URL: https://issues.apache.org/jira/browse/FLINK-26586 > Project: Flink > Issue Type: Improvement > Components: API / State Processor, Connectors / FileSystem, Runtime > / Checkpointing >Affects Versions: 1.13.0, 1.14.0 >Reporter: Matthias Schwalbe >Priority: Major > Attachments: BufferedFSDataInputStreamWrapper.java, > BufferedLocalFileSystem.java > > > - I found out that, at least when using LocalFileSystem on a windows system, > read I/O to load a savepoint is unbuffered, > - See example stack [1] > - i.e. in order to load only a long in a serializer, it needs to go into > kernel mode 8 times and load the 8 bytes one by one > - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered > reads on any FileSystem implementation > - In our setting savepoint load is now 30 times faster > - I’ve once seen a Jira ticket as to improve savepoint load time in general > (lost the link unfortunately), maybe this approach can help with it > - not sure if HDFS has got the same problem > - I can contribute my implementation of a BufferedFSDataInputStreamWrapper > which can be integrated in any > [1] unbuffered reads stack: > read:207, FileInputStream (java.io) > read:68, LocalDataInputStream (org.apache.flink.core.fs.local) > read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs) > read:42, ForwardingInputStream (org.apache.flink.runtime.util) > readInt:390, DataInputStream (java.io) > deserialize:80, BytePrimitiveArraySerializer > (org.apache.flink.api.common.typeutils.base.array) > next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator > (org.apache.flink.runtime.state.restore) > next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator > (org.apache.flink.runtime.state.restore) > restoreKVStateData:147, RocksDBFullRestoreOperation > (org.apache.flink.contrib.streaming.state.restore) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33793) java.lang.NoSuchMethodError when checkpointing in Google Cloud Storage
[ https://issues.apache.org/jira/browse/FLINK-33793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChangZhuo Chen (陳昌倬) updated FLINK-33793: - Description: We have the following exception when checkpointing in Flink 1.18 + Google Cloud Storage. The same code works well in Flink 1.17: {{2023-12-11 07:45:28,861 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager-io-thread-5' produced an uncaught exception. Stopping the process...}} {{java.lang.NoSuchMethodError: 'com.google.common.collect.ImmutableMap com.google.common.collect.ImmutableMap$Builder.buildOrThrow()'}} {{ at com.google.cloud.storage.UnifiedOpts$Opts.getRpcOptions(UnifiedOpts.java:2096) ~[?:?]}} {{ at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:624) ~[?:?]}} {{ at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:90) ~[?:?]}} {{ at org.apache.flink.fs.gs.storage.GSBlobStorageImpl.writeBlob(GSBlobStorageImpl.java:64) ~[?:?]}} {{ at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.createWriteChannel(GSRecoverableFsDataOutputStream.java:229) ~[?:?]}} {{ at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:152) ~[?:?]}} {{ at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:135) ~[?:?]}} {{ at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:128) ~[?:?]}} {{ at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:73) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at java.io.DataOutputStream.writeInt(Unknown Source) ~[?:?]}} {{ at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:98) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:335) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1404) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1303) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1195) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist-1.18.0.jar:1.18.0]}} {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]}} {{ at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]}} {{ at java.lang.Thread.run(Unknown Source) [?:?]}} The issue has been reported in GitHub [https://github.com/apache/flink/pull/22281#issuecomment-1728553794.] However, it is still not fixed yet in 1.18.0. was: We have the following exception when checkpointing in Flink 1.18 + Google Cloud Storage. The same code works well in Flink 1.17: {{2023-12-11 07:45:28,861 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager-io-thread-5' produced an uncaught exception. Stopping the process...}} {{java.lang.NoSuchMethodError: 'com.google.common.collect.ImmutableMap com.google.common.collect.ImmutableMap$Builder.buildOrThrow()'}} {{ at com.google.cloud.storage.UnifiedOpts$Opts.getRpcOptions(UnifiedOpts.java:2096) ~[?:?]}} {{ at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:624) ~[?:?]}} {{ at com.google.cloud.storage.StorageImpl.writer(StorageImpl.java:90) ~[?:?]}} {{ at org.apache.flink.fs.gs.storage.GSBlobStorageImpl.writeBlob(GSBlobStorageImpl.java:64) ~[?:?]}} {{ at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.createWriteChannel(GSRecoverableFsDataOutputStream.java:229) ~[?:?]}} {{ at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:152) ~[?:?]}} {{ at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.write(GSRecoverableFsDataOutputStream.java:135) ~[?:?]}} {{ at
Re: [PR] [FLINK-33500][Runtime] Run storing the JobGraph an asynchronous operation [flink]
XComp commented on code in PR #23880: URL: https://github.com/apache/flink/pull/23880#discussion_r1422663705 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java: ## @@ -37,6 +38,18 @@ public interface JobGraphWriter extends LocallyCleanableResource, GloballyCleana */ void putJobGraph(JobGraph jobGraph) throws Exception; +/** + * Adds the {@link JobGraph} instance and have write operations performed asynchronously in + * ioExecutor of Dispatcher + * + * @param jobGraph + * @param ioExecutor + * @return + * @throws Exception + */ +CompletableFuture putJobGraphAsync(JobGraph jobGraph, Optional ioExecutor) Review Comment: A few things on the interface change: 1. `Optional` is not the usual way we implement async and sync behavior with a single method. You can rely on the `DirectExecutor` to achieve the same. There is no need to deal with `Optional`. 2. For cases where you want to have the async and the sync version of a method being available, the code is usually easier to read if you put the business logic in the sync method and implement the async method in the following way: ```java public void runRandomMethod(Object obj) { // do something } public CompletableFuture runRandomMethodAsync(Object obj, Executor executor) { return FutureUtils.runAsync(() -> runRandomMethod(obj), executor); } ``` 3. I'm wondering whether we should make all `put*` methods in the `JobGraphWriter` interface asynchronous rather than maintaining a synchonous `putJobGraph` method along the `putJobGraphAsync` method which is then only called by `putJobResourceRequirements`. `putJobResourceRequirements` could block the `Dispatcher` for the very same reason why `putJobGraph` would block. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28215) Bump Maven Surefire plugin to 3.2.2
[ https://issues.apache.org/jira/browse/FLINK-28215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-28215. -- Fix Version/s: 1.19.0 Resolution: Fixed Fixed in apache/flink:master ea4cdc28651ad91defd4fc7b371a1f520ea7a262 > Bump Maven Surefire plugin to 3.2.2 > --- > > Key: FLINK-28215 > URL: https://issues.apache.org/jira/browse/FLINK-28215 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.2.2 [flink]
MartijnVisser merged PR #22502: URL: https://github.com/apache/flink/pull/22502 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.2.2 [flink]
MartijnVisser commented on PR #22502: URL: https://github.com/apache/flink/pull/22502#issuecomment-1850303930 > Was there a specific reason that made us wanting to upgrade the surefire plugin? Nothing more then I always prefer to run on stable version over milestone releases :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]
mxm commented on code in PR #23247: URL: https://github.com/apache/flink/pull/23247#discussion_r1422626734 ## flink-core/src/main/java/org/apache/flink/configuration/RestartStrategyOptions.java: ## @@ -184,7 +184,7 @@ public class RestartStrategyOptions { public static final ConfigOption RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER = ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier") .doubleType() -.defaultValue(2.0) +.defaultValue(1.2) Review Comment: Could we set this to `1.5` to make it a bit less aggressive? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org