Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-12 Thread via GitHub


pnowojski commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1450048228


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##
@@ -41,8 +43,14 @@
 
 /** Help class for downloading RocksDB state files. */
 public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
-public RocksDBStateDownloader(int restoringThreadNum) {
+static final String DOWNLOAD_DURATION_STATE_METRIC = 
"DownloadStateDurationMs";

Review Comment:
   I've moved it to the `MetricNames`. This download time could be used by 
different state backends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-12 Thread via GitHub


pnowojski merged PR #24023:
URL: https://github.com/apache/flink/pull/24023


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-11 Thread via GitHub


StefanRRichter commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1448868567


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##
@@ -41,8 +43,14 @@
 
 /** Help class for downloading RocksDB state files. */
 public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
-public RocksDBStateDownloader(int restoringThreadNum) {
+static final String DOWNLOAD_DURATION_STATE_METRIC = 
"DownloadStateDurationMs";

Review Comment:
   As a compromise, we could have some RocksDBMetricNames that lives in the 
Rocks package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-11 Thread via GitHub


pnowojski commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1448861339


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##
@@ -41,8 +43,14 @@
 
 /** Help class for downloading RocksDB state files. */
 public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
-public RocksDBStateDownloader(int restoringThreadNum) {
+static final String DOWNLOAD_DURATION_STATE_METRIC = 
"DownloadStateDurationMs";

Review Comment:
   My thinking was that this is a custom RocksDB's metric, so it should be 
named in the rocksdb module. But on a second thought, moving this name away to 
`MetricNames` doesn't hurt



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-11 Thread via GitHub


pnowojski commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1448857937


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java:
##
@@ -65,8 +54,8 @@
  *
  * {@code
  * StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
- * env.setStateBackend(new HashMapStateBackend());
- * env.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
+ * parameters.getEnv().setStateBackend(new HashMapStateBackend());

Review Comment:
   hahaha, some find & replace that went wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-11 Thread via GitHub


StefanRRichter commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1448564562


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java:
##
@@ -65,8 +54,8 @@
  *
  * {@code
  * StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
- * env.setStateBackend(new HashMapStateBackend());
- * env.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
+ * parameters.getEnv().setStateBackend(new HashMapStateBackend());

Review Comment:
   Change to the comment looks wrong.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java:
##
@@ -94,90 +95,27 @@ default String getName() {
  *
  * Keyed State is state where each value is bound to a key.
  *
- * @param env The environment of the task.
- * @param jobID The ID of the job that the task belongs to.
- * @param operatorIdentifier The identifier text of the operator.
- * @param keySerializer The key-serializer for the operator.
- * @param numberOfKeyGroups The number of key-groups aka max parallelism.
- * @param keyGroupRange Range of key-groups for which the to-be-created 
backend is responsible.
- * @param kvStateRegistry KvStateRegistry helper for this task.
- * @param ttlTimeProvider Provider for TTL logic to judge about state 
expiration.
- * @param metricGroup The parent metric group for all state backend 
metrics.
- * @param stateHandles The state handles for restore.
- * @param cancelStreamRegistry The registry to which created closeable 
objects will be
- * registered during restore.
+ * @param parameters accessor.

Review Comment:
   The doc for this argument is "accessor" everywhere, what does that even 
mean? Why not "argument bundle for creating " or something like that



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##
@@ -41,8 +43,14 @@
 
 /** Help class for downloading RocksDB state files. */
 public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
-public RocksDBStateDownloader(int restoringThreadNum) {
+static final String DOWNLOAD_DURATION_STATE_METRIC = 
"DownloadStateDurationMs";

Review Comment:
   Should we rather keep all (present and future) init metrics key in a 
dedicated class instead of scattered around?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-11 Thread via GitHub


dawidwys commented on PR #24023:
URL: https://github.com/apache/flink/pull/24023#issuecomment-1886664226

   I think that's a fair approach. I agree very few users implement state 
backends and if they do they are knowledgeable enough to easily migrate their 
code. Moreover we changed this interface rather freely in the past adding and 
removing arguments to its methods. Therefore +1 for the approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-10 Thread via GitHub


pnowojski commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1447579407


##
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java:
##
@@ -58,42 +46,16 @@ final class StubStateBackend implements StateBackend {
 
 @Override
 public  CheckpointableKeyedStateBackend createKeyedStateBackend(
-Environment env,
-JobID jobID,
-String operatorIdentifier,
-TypeSerializer keySerializer,
-int numberOfKeyGroups,
-KeyGroupRange keyGroupRange,
-TaskKvStateRegistry kvStateRegistry,
-TtlTimeProvider ttlTimeProvider,
-MetricGroup metricGroup,
-@Nonnull Collection stateHandles,
-CloseableRegistry cancelStreamRegistry)
-throws Exception {
-
+KeyedStateBackendParameters parameters) throws Exception {
 return backend.createKeyedStateBackend(
-env,
-jobID,
-operatorIdentifier,
-keySerializer,
-numberOfKeyGroups,
-keyGroupRange,
-kvStateRegistry,
-this.ttlTimeProvider,
-metricGroup,
-stateHandles,
-cancelStreamRegistry);
+new KeyedStateBackendParametersImpl<>(parameters)
+.setTtlTimeProvider(ttlTimeProvider));

Review Comment:
   bug was fixed here



##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogStateBackend.java:
##
@@ -83,89 +77,25 @@ public abstract class AbstractChangelogStateBackend
 
 @Override
 public  CheckpointableKeyedStateBackend createKeyedStateBackend(
-Environment env,
-JobID jobID,
-String operatorIdentifier,
-TypeSerializer keySerializer,
-int numberOfKeyGroups,
-KeyGroupRange keyGroupRange,
-TaskKvStateRegistry kvStateRegistry,
-TtlTimeProvider ttlTimeProvider,
-MetricGroup metricGroup,
-@Nonnull Collection stateHandles,
-CloseableRegistry cancelStreamRegistry)
-throws Exception {
+KeyedStateBackendParameters parameters) throws Exception {
 return restore(
-env,
-operatorIdentifier,
-keyGroupRange,
-ttlTimeProvider,
-metricGroup,
-castHandles(stateHandles),
+parameters.getEnv(),
+parameters.getOperatorIdentifier(),
+parameters.getKeyGroupRange(),
+parameters.getTtlTimeProvider(),
+parameters.getMetricGroup(),
+castHandles(parameters.getStateHandles()),
 baseHandles ->
 (AbstractKeyedStateBackend)
 delegatedStateBackend.createKeyedStateBackend(
-env,
-jobID,
-operatorIdentifier,
-keySerializer,
-numberOfKeyGroups,
-keyGroupRange,
-kvStateRegistry,
-ttlTimeProvider,
-metricGroup,
-baseHandles,
-cancelStreamRegistry));
-}
-
-@Override
-public  CheckpointableKeyedStateBackend createKeyedStateBackend(
-Environment env,
-JobID jobID,
-String operatorIdentifier,
-TypeSerializer keySerializer,
-int numberOfKeyGroups,
-KeyGroupRange keyGroupRange,
-TaskKvStateRegistry kvStateRegistry,
-TtlTimeProvider ttlTimeProvider,
-MetricGroup metricGroup,
-@Nonnull Collection stateHandles,
-CloseableRegistry cancelStreamRegistry,
-double managedMemoryFraction)
-throws Exception {
-return restore(
-env,
-operatorIdentifier,
-keyGroupRange,
-ttlTimeProvider,
-metricGroup,
-castHandles(stateHandles),
-baseHandles ->
-(AbstractKeyedStateBackend)
-delegatedStateBackend.createKeyedStateBackend(
-env,
-jobID,
-operatorIdentifier,
-keySerializer,
-

Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-09 Thread via GitHub


pnowojski commented on PR #24023:
URL: https://github.com/apache/flink/pull/24023#issuecomment-1883070789

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-09 Thread via GitHub


pnowojski commented on PR #24023:
URL: https://github.com/apache/flink/pull/24023#issuecomment-1883070691

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-04 Thread via GitHub


pnowojski commented on PR #24023:
URL: https://github.com/apache/flink/pull/24023#issuecomment-1877155133

   Thanks for the review!
   
   > Why only add the download metric? While you are at this, you could have 
also simply added metrics for, e.g., merge/clip.
   
   That was the scope of the FLIP. Partially because I wanted to keep the scope 
as small as possible to minimise required efforts. We can always add more 
metrics in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-04 Thread via GitHub


pnowojski commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1441789636


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackendParametersImpl.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
+import org.apache.flink.runtime.state.StateBackend.KeyedStateBackendParameters;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+
+/**
+ * Internal POJO implementing {@link KeyedStateBackendParameters}
+ *
+ * @param 
+ */
+@Internal
+public class KeyedStateBackendParametersImpl implements 
KeyedStateBackendParameters {

Review Comment:
   As discussed offline, the interface is part of the Flink's public api, and 
having it as interface instead of a concrete pojo class gives us more 
flexibility in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-03 Thread via GitHub


StefanRRichter commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1440607428


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackendParametersImpl.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
+import org.apache.flink.runtime.state.StateBackend.KeyedStateBackendParameters;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+
+/**
+ * Internal POJO implementing {@link KeyedStateBackendParameters}
+ *
+ * @param 
+ */
+@Internal
+public class KeyedStateBackendParametersImpl implements 
KeyedStateBackendParameters {

Review Comment:
   What's the reason to have an interface for this class? It looks like a 
simple POJO that gives more meaning to a bunch of parameters. And there is only 
one impl.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java:
##
@@ -201,4 +278,75 @@ default boolean supportsNoClaimRestoreMode() {
 default boolean supportsSavepointFormat(SavepointFormatType formatType) {
 return formatType == SavepointFormatType.CANONICAL;
 }
+
+/**
+ * Parameters passed to {@link
+ * StateBackend#createKeyedStateBackend(KeyedStateBackendParameters)}.
+ *
+ * @param  The type of the keys by which the state is organized.
+ */
+@PublicEvolving
+interface KeyedStateBackendParameters {
+/** @return The runtime environment of the executing task. */
+Environment getEnv();
+
+JobID getJobID();
+
+String getOperatorIdentifier();
+
+TypeSerializer getKeySerializer();
+
+int getNumberOfKeyGroups();
+
+/** @return Range of key-groups for which the to-be-created backend is 
responsible. */
+KeyGroupRange getKeyGroupRange();
+
+TaskKvStateRegistry getKvStateRegistry();
+
+/** @return Provider for TTL logic to judge about state expiration. */
+TtlTimeProvider getTtlTimeProvider();
+
+MetricGroup getMetricGroup();
+
+@Nonnull
+Collection getStateHandles();
+
+/**
+ * @return The registry to which created closeable objects will be * 
registered during
+ * restore.
+ */
+CloseableRegistry getCancelStreamRegistry();
+
+double getManagedMemoryFraction();
+
+CustomInitializationMetrics getCustomInitializationMetrics();
+}
+
+/**
+ * Parameters passed to {@link
+ * 
StateBackend#createOperatorStateBackend(OperatorStateBackendParameters)}.
+ */
+@PublicEvolving
+interface OperatorStateBackendParameters {
+/** @return The runtime environment of the executing task. */
+Environment getEnv();

Review Comment:
   As said in a previous comment, not a fan of the extra interfaces, I'd simple 
turn them into classes. And then they could have a common superclass of env, 
operator id, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-03 Thread via GitHub


flinkbot commented on PR #24023:
URL: https://github.com/apache/flink/pull/24023#issuecomment-1875528953

   
   ## CI report:
   
   * e7b15627f0fe1823eb893fff103e8c0f715bc223 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-03 Thread via GitHub


pnowojski opened a new pull request, #24023:
URL: https://github.com/apache/flink/pull/24023

   This PR builds on top of https://github.com/apache/flink/pull/23908
   
   ## Brief change log
   
   Please check individual commit messages
   
   ## Verifying this change
   
   This change has been manually tested and added a unit test that metric has 
been created.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org