Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-11-06 Thread via GitHub


1996fanrui merged PR #23599:
URL: https://github.com/apache/flink/pull/23599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-11-06 Thread via GitHub


huwh commented on code in PR #23599:
URL: https://github.com/apache/flink/pull/23599#discussion_r1384389582


##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java:
##
@@ -253,13 +274,19 @@ public void loadBigData(
 
 Preconditions.checkNotNull(blobService);
 
-final File dataFile = blobService.getFile(jobId, jobInfoKey);
-// NOTE: Do not delete the job info BLOB since it may be needed 
again during recovery.
-//   (it is deleted automatically on the BLOB server and cache 
when the job
-//   enters a terminal state)
-SerializedValue serializedValue =
-
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
-serializedJobInformation = new NonOffloaded<>(serializedValue);
+JobInformation jobInformation = jobInformationCache.get(jobId, 
jobInfoKey);
+if (jobInformation == null) {
+final File dataFile = blobService.getFile(jobId, jobInfoKey);
+// NOTE: Do not delete the job info BLOB since it may be 
needed again during
+// recovery. (it is deleted automatically on the BLOB server 
and cache when the job
+// enters a terminal state)
+jobInformation =
+InstantiationUtil.deserializeObject(
+new 
BufferedInputStream(Files.newInputStream(dataFile.toPath())),
+getClass().getClassLoader());
+jobInformationCache.put(jobId, jobInfoKey, jobInformation);
+}
+this.jobInformation = jobInformation.deepCopy();

Review Comment:
   Thanks for your explanation.  Let's keep the `deepCopy`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-11-05 Thread via GitHub


1996fanrui commented on code in PR #23599:
URL: https://github.com/apache/flink/pull/23599#discussion_r1382890456


##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java:
##
@@ -253,13 +274,19 @@ public void loadBigData(
 
 Preconditions.checkNotNull(blobService);
 
-final File dataFile = blobService.getFile(jobId, jobInfoKey);
-// NOTE: Do not delete the job info BLOB since it may be needed 
again during recovery.
-//   (it is deleted automatically on the BLOB server and cache 
when the job
-//   enters a terminal state)
-SerializedValue serializedValue =
-
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
-serializedJobInformation = new NonOffloaded<>(serializedValue);
+JobInformation jobInformation = jobInformationCache.get(jobId, 
jobInfoKey);
+if (jobInformation == null) {
+final File dataFile = blobService.getFile(jobId, jobInfoKey);
+// NOTE: Do not delete the job info BLOB since it may be 
needed again during
+// recovery. (it is deleted automatically on the BLOB server 
and cache when the job
+// enters a terminal state)
+jobInformation =
+InstantiationUtil.deserializeObject(
+new 
BufferedInputStream(Files.newInputStream(dataFile.toPath())),
+getClass().getClassLoader());
+jobInformationCache.put(jobId, jobInfoKey, jobInformation);
+}
+this.jobInformation = jobInformation.deepCopy();

Review Comment:
   Hi @huwh , thanks a lot for your review!
   
   > Can we use this.jobInformation = jobInformation here ?
   
   Yes, we can. Your suggestion can work, and both are fine to me. 
   
   Let me explain the background of the design: The purpose of defining 
`deepCopy` for `JobInformation` is that when other developers add new fields to 
`JobInformation`, he can more easily notice whether the new field requires 
`deepCopy`. When adding a new filed in the future, and the new field can be 
changed by tasks, it's really dangerous. So let developers know this risk is 
necessary.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-11-05 Thread via GitHub


huwh commented on code in PR #23599:
URL: https://github.com/apache/flink/pull/23599#discussion_r1382863857


##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java:
##
@@ -253,13 +274,19 @@ public void loadBigData(
 
 Preconditions.checkNotNull(blobService);
 
-final File dataFile = blobService.getFile(jobId, jobInfoKey);
-// NOTE: Do not delete the job info BLOB since it may be needed 
again during recovery.
-//   (it is deleted automatically on the BLOB server and cache 
when the job
-//   enters a terminal state)
-SerializedValue serializedValue =
-
SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
-serializedJobInformation = new NonOffloaded<>(serializedValue);
+JobInformation jobInformation = jobInformationCache.get(jobId, 
jobInfoKey);
+if (jobInformation == null) {
+final File dataFile = blobService.getFile(jobId, jobInfoKey);
+// NOTE: Do not delete the job info BLOB since it may be 
needed again during
+// recovery. (it is deleted automatically on the BLOB server 
and cache when the job
+// enters a terminal state)
+jobInformation =
+InstantiationUtil.deserializeObject(
+new 
BufferedInputStream(Files.newInputStream(dataFile.toPath())),
+getClass().getClassLoader());
+jobInformationCache.put(jobId, jobInfoKey, jobInformation);
+}
+this.jobInformation = jobInformation.deepCopy();

Review Comment:
   Can we use `this.jobInformation = jobInformation` here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-10-30 Thread via GitHub


1996fanrui commented on code in PR #23599:
URL: https://github.com/apache/flink/pull/23599#discussion_r1376013241


##
flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Default implement of {@link GroupCache}. Entries will be expired after 
timeout. */
+@NotThreadSafe
+public class DefaultGroupCache implements GroupCache {
+private final Cache, V> cache;
+private final Map>> cachedBlobKeysPerJob;
+
+private DefaultGroupCache(Duration expireTimeout, int cacheSizeLimit, 
Ticker ticker) {
+this.cachedBlobKeysPerJob = new HashMap<>();
+this.cache =
+CacheBuilder.newBuilder()
+.concurrencyLevel(1)
+.maximumSize(cacheSizeLimit)
+.expireAfterAccess(expireTimeout)
+.ticker(ticker)
+.removalListener(this::onCacheRemoval)
+.build();
+}
+
+@Override
+public void clear() {
+cachedBlobKeysPerJob.clear();
+cache.cleanUp();
+}
+
+@Override
+public V get(G group, K key) {
+return cache.getIfPresent(new CacheKey<>(group, key));
+}
+
+@Override
+public void put(G group, K key, V value) {
+CacheKey cacheKey = new CacheKey<>(group, key);
+cache.put(cacheKey, value);
+cachedBlobKeysPerJob.computeIfAbsent(group, ignore -> new 
HashSet<>()).add(cacheKey);
+}
+
+@Override
+public void clearCacheForGroup(G group) {
+Set> removed = cachedBlobKeysPerJob.remove(group);
+if (removed != null) {
+cache.invalidateAll(removed);
+}
+}
+
+/**
+ * Removal listener that remove the cache key of this group .
+ *
+ * @param removalNotification of removed element.
+ */
+private void onCacheRemoval(RemovalNotification, V> 
removalNotification) {
+CacheKey cacheKey = removalNotification.getKey();
+V value = removalNotification.getValue();
+if (cacheKey != null && value != null) {
+cachedBlobKeysPerJob.computeIfPresent(
+cacheKey.getGroup(),
+(group, keys) -> {
+keys.remove(cacheKey);
+if (keys.isEmpty()) {
+return null;
+} else {
+return keys;
+}
+});

Review Comment:
   I write a demo on My IDEA, it doesn't have the memory leak. When return 
null, the map will remove the key.
   
   
   https://github.com/apache/flink/assets/38427477/457dd9bb-9ea3-474f-aab4-b0c43e0b5020;>
   
   
   https://github.com/apache/flink/assets/38427477/040b3831-1919-4a6e-afa0-8742e4bf200b;>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-10-30 Thread via GitHub


RocMarshal commented on code in PR #23599:
URL: https://github.com/apache/flink/pull/23599#discussion_r1375961136


##
flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Default implement of {@link GroupCache}. Entries will be expired after 
timeout. */
+@NotThreadSafe
+public class DefaultGroupCache implements GroupCache {
+private final Cache, V> cache;
+private final Map>> cachedBlobKeysPerJob;
+
+private DefaultGroupCache(Duration expireTimeout, int cacheSizeLimit, 
Ticker ticker) {
+this.cachedBlobKeysPerJob = new HashMap<>();
+this.cache =
+CacheBuilder.newBuilder()
+.concurrencyLevel(1)
+.maximumSize(cacheSizeLimit)
+.expireAfterAccess(expireTimeout)
+.ticker(ticker)
+.removalListener(this::onCacheRemoval)
+.build();
+}
+
+@Override
+public void clear() {
+cachedBlobKeysPerJob.clear();
+cache.cleanUp();
+}
+
+@Override
+public V get(G group, K key) {
+return cache.getIfPresent(new CacheKey<>(group, key));
+}
+
+@Override
+public void put(G group, K key, V value) {
+CacheKey cacheKey = new CacheKey<>(group, key);
+cache.put(cacheKey, value);
+cachedBlobKeysPerJob.computeIfAbsent(group, ignore -> new 
HashSet<>()).add(cacheKey);
+}
+
+@Override
+public void clearCacheForGroup(G group) {
+Set> removed = cachedBlobKeysPerJob.remove(group);
+if (removed != null) {
+cache.invalidateAll(removed);
+}
+}
+
+/**
+ * Removal listener that remove the cache key of this group .
+ *
+ * @param removalNotification of removed element.
+ */
+private void onCacheRemoval(RemovalNotification, V> 
removalNotification) {
+CacheKey cacheKey = removalNotification.getKey();
+V value = removalNotification.getValue();
+if (cacheKey != null && value != null) {
+cachedBlobKeysPerJob.computeIfPresent(
+cacheKey.getGroup(),
+(group, keys) -> {
+keys.remove(cacheKey);
+if (keys.isEmpty()) {
+return null;
+} else {
+return keys;
+}
+});

Review Comment:
   Would be there a risk of memory leakage here?
   For example, let's talk about the situation:
   
   - There are too many groups
   - Perform the following operations on each of these groups one by one
 - Add a set for one group and then remove set for the same one group, but 
the key has not been removed. Would there be many Entries in the form of 
`Entry-i`(set-i is empty or null) ?
 
   In short, would `cachedBlobKeysPerJob` degenerate into a collection with too 
many elements?
   
   Please correct me if needed for my limited read.



##
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java:
##
@@ -151,39 +166,43 @@ public TaskDeploymentDescriptor(
 }
 
 /**
- * Return the sub task's serialized job information.
+ * Return the sub task's job information.
  *
- * @return serialized job information (may throw {@link 
IllegalStateException} if {@link
- * #loadBigData} is not called beforehand).
+ * @return job 

Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-10-30 Thread via GitHub


1996fanrui commented on PR #23599:
URL: https://github.com/apache/flink/pull/23599#issuecomment-1784821304

   Hi @huwh , would you mind helping take a look this PR in your free time as 
well?
   
   This improvement is totally similar with FLINK-32386 is contributed by you, 
and this PR refactor the ShuffleDescriptorsCache into a generic GroupCache, so 
it would be better if you join this review, thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-10-30 Thread via GitHub


1996fanrui commented on PR #23599:
URL: https://github.com/apache/flink/pull/23599#issuecomment-1784639173

   Hi @pnowojski @RocMarshal , would you mind helping take a look this PR in 
your free time? Thanks a lot


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]

2023-10-26 Thread via GitHub


flinkbot commented on PR #23599:
URL: https://github.com/apache/flink/pull/23599#issuecomment-1780625586

   
   ## CI report:
   
   * 2cd1a297d97a52d9778a430d1f9d887aff9bab63 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org