Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
1996fanrui merged PR #23599: URL: https://github.com/apache/flink/pull/23599 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
huwh commented on code in PR #23599: URL: https://github.com/apache/flink/pull/23599#discussion_r1384389582 ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java: ## @@ -253,13 +274,19 @@ public void loadBigData( Preconditions.checkNotNull(blobService); -final File dataFile = blobService.getFile(jobId, jobInfoKey); -// NOTE: Do not delete the job info BLOB since it may be needed again during recovery. -// (it is deleted automatically on the BLOB server and cache when the job -// enters a terminal state) -SerializedValue serializedValue = - SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); -serializedJobInformation = new NonOffloaded<>(serializedValue); +JobInformation jobInformation = jobInformationCache.get(jobId, jobInfoKey); +if (jobInformation == null) { +final File dataFile = blobService.getFile(jobId, jobInfoKey); +// NOTE: Do not delete the job info BLOB since it may be needed again during +// recovery. (it is deleted automatically on the BLOB server and cache when the job +// enters a terminal state) +jobInformation = +InstantiationUtil.deserializeObject( +new BufferedInputStream(Files.newInputStream(dataFile.toPath())), +getClass().getClassLoader()); +jobInformationCache.put(jobId, jobInfoKey, jobInformation); +} +this.jobInformation = jobInformation.deepCopy(); Review Comment: Thanks for your explanation. Let's keep the `deepCopy`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
1996fanrui commented on code in PR #23599: URL: https://github.com/apache/flink/pull/23599#discussion_r1382890456 ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java: ## @@ -253,13 +274,19 @@ public void loadBigData( Preconditions.checkNotNull(blobService); -final File dataFile = blobService.getFile(jobId, jobInfoKey); -// NOTE: Do not delete the job info BLOB since it may be needed again during recovery. -// (it is deleted automatically on the BLOB server and cache when the job -// enters a terminal state) -SerializedValue serializedValue = - SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); -serializedJobInformation = new NonOffloaded<>(serializedValue); +JobInformation jobInformation = jobInformationCache.get(jobId, jobInfoKey); +if (jobInformation == null) { +final File dataFile = blobService.getFile(jobId, jobInfoKey); +// NOTE: Do not delete the job info BLOB since it may be needed again during +// recovery. (it is deleted automatically on the BLOB server and cache when the job +// enters a terminal state) +jobInformation = +InstantiationUtil.deserializeObject( +new BufferedInputStream(Files.newInputStream(dataFile.toPath())), +getClass().getClassLoader()); +jobInformationCache.put(jobId, jobInfoKey, jobInformation); +} +this.jobInformation = jobInformation.deepCopy(); Review Comment: Hi @huwh , thanks a lot for your review! > Can we use this.jobInformation = jobInformation here ? Yes, we can. Your suggestion can work, and both are fine to me. Let me explain the background of the design: The purpose of defining `deepCopy` for `JobInformation` is that when other developers add new fields to `JobInformation`, he can more easily notice whether the new field requires `deepCopy`. When adding a new filed in the future, and the new field can be changed by tasks, it's really dangerous. So let developers know this risk is necessary. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
huwh commented on code in PR #23599: URL: https://github.com/apache/flink/pull/23599#discussion_r1382863857 ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java: ## @@ -253,13 +274,19 @@ public void loadBigData( Preconditions.checkNotNull(blobService); -final File dataFile = blobService.getFile(jobId, jobInfoKey); -// NOTE: Do not delete the job info BLOB since it may be needed again during recovery. -// (it is deleted automatically on the BLOB server and cache when the job -// enters a terminal state) -SerializedValue serializedValue = - SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); -serializedJobInformation = new NonOffloaded<>(serializedValue); +JobInformation jobInformation = jobInformationCache.get(jobId, jobInfoKey); +if (jobInformation == null) { +final File dataFile = blobService.getFile(jobId, jobInfoKey); +// NOTE: Do not delete the job info BLOB since it may be needed again during +// recovery. (it is deleted automatically on the BLOB server and cache when the job +// enters a terminal state) +jobInformation = +InstantiationUtil.deserializeObject( +new BufferedInputStream(Files.newInputStream(dataFile.toPath())), +getClass().getClassLoader()); +jobInformationCache.put(jobId, jobInfoKey, jobInformation); +} +this.jobInformation = jobInformation.deepCopy(); Review Comment: Can we use `this.jobInformation = jobInformation` here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
1996fanrui commented on code in PR #23599: URL: https://github.com/apache/flink/pull/23599#discussion_r1376013241 ## flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.shaded.guava31.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava31.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** Default implement of {@link GroupCache}. Entries will be expired after timeout. */ +@NotThreadSafe +public class DefaultGroupCache implements GroupCache { +private final Cache, V> cache; +private final Map>> cachedBlobKeysPerJob; + +private DefaultGroupCache(Duration expireTimeout, int cacheSizeLimit, Ticker ticker) { +this.cachedBlobKeysPerJob = new HashMap<>(); +this.cache = +CacheBuilder.newBuilder() +.concurrencyLevel(1) +.maximumSize(cacheSizeLimit) +.expireAfterAccess(expireTimeout) +.ticker(ticker) +.removalListener(this::onCacheRemoval) +.build(); +} + +@Override +public void clear() { +cachedBlobKeysPerJob.clear(); +cache.cleanUp(); +} + +@Override +public V get(G group, K key) { +return cache.getIfPresent(new CacheKey<>(group, key)); +} + +@Override +public void put(G group, K key, V value) { +CacheKey cacheKey = new CacheKey<>(group, key); +cache.put(cacheKey, value); +cachedBlobKeysPerJob.computeIfAbsent(group, ignore -> new HashSet<>()).add(cacheKey); +} + +@Override +public void clearCacheForGroup(G group) { +Set> removed = cachedBlobKeysPerJob.remove(group); +if (removed != null) { +cache.invalidateAll(removed); +} +} + +/** + * Removal listener that remove the cache key of this group . + * + * @param removalNotification of removed element. + */ +private void onCacheRemoval(RemovalNotification, V> removalNotification) { +CacheKey cacheKey = removalNotification.getKey(); +V value = removalNotification.getValue(); +if (cacheKey != null && value != null) { +cachedBlobKeysPerJob.computeIfPresent( +cacheKey.getGroup(), +(group, keys) -> { +keys.remove(cacheKey); +if (keys.isEmpty()) { +return null; +} else { +return keys; +} +}); Review Comment: I write a demo on My IDEA, it doesn't have the memory leak. When return null, the map will remove the key. https://github.com/apache/flink/assets/38427477/457dd9bb-9ea3-474f-aab4-b0c43e0b5020;> https://github.com/apache/flink/assets/38427477/040b3831-1919-4a6e-afa0-8742e4bf200b;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
RocMarshal commented on code in PR #23599: URL: https://github.com/apache/flink/pull/23599#discussion_r1375961136 ## flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.shaded.guava31.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava31.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** Default implement of {@link GroupCache}. Entries will be expired after timeout. */ +@NotThreadSafe +public class DefaultGroupCache implements GroupCache { +private final Cache, V> cache; +private final Map>> cachedBlobKeysPerJob; + +private DefaultGroupCache(Duration expireTimeout, int cacheSizeLimit, Ticker ticker) { +this.cachedBlobKeysPerJob = new HashMap<>(); +this.cache = +CacheBuilder.newBuilder() +.concurrencyLevel(1) +.maximumSize(cacheSizeLimit) +.expireAfterAccess(expireTimeout) +.ticker(ticker) +.removalListener(this::onCacheRemoval) +.build(); +} + +@Override +public void clear() { +cachedBlobKeysPerJob.clear(); +cache.cleanUp(); +} + +@Override +public V get(G group, K key) { +return cache.getIfPresent(new CacheKey<>(group, key)); +} + +@Override +public void put(G group, K key, V value) { +CacheKey cacheKey = new CacheKey<>(group, key); +cache.put(cacheKey, value); +cachedBlobKeysPerJob.computeIfAbsent(group, ignore -> new HashSet<>()).add(cacheKey); +} + +@Override +public void clearCacheForGroup(G group) { +Set> removed = cachedBlobKeysPerJob.remove(group); +if (removed != null) { +cache.invalidateAll(removed); +} +} + +/** + * Removal listener that remove the cache key of this group . + * + * @param removalNotification of removed element. + */ +private void onCacheRemoval(RemovalNotification, V> removalNotification) { +CacheKey cacheKey = removalNotification.getKey(); +V value = removalNotification.getValue(); +if (cacheKey != null && value != null) { +cachedBlobKeysPerJob.computeIfPresent( +cacheKey.getGroup(), +(group, keys) -> { +keys.remove(cacheKey); +if (keys.isEmpty()) { +return null; +} else { +return keys; +} +}); Review Comment: Would be there a risk of memory leakage here? For example, let's talk about the situation: - There are too many groups - Perform the following operations on each of these groups one by one - Add a set for one group and then remove set for the same one group, but the key has not been removed. Would there be many Entries in the form of `Entry-i`(set-i is empty or null) ? In short, would `cachedBlobKeysPerJob` degenerate into a collection with too many elements? Please correct me if needed for my limited read. ## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java: ## @@ -151,39 +166,43 @@ public TaskDeploymentDescriptor( } /** - * Return the sub task's serialized job information. + * Return the sub task's job information. * - * @return serialized job information (may throw {@link IllegalStateException} if {@link - * #loadBigData} is not called beforehand). + * @return job
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
1996fanrui commented on PR #23599: URL: https://github.com/apache/flink/pull/23599#issuecomment-1784821304 Hi @huwh , would you mind helping take a look this PR in your free time as well? This improvement is totally similar with FLINK-32386 is contributed by you, and this PR refactor the ShuffleDescriptorsCache into a generic GroupCache, so it would be better if you join this review, thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
1996fanrui commented on PR #23599: URL: https://github.com/apache/flink/pull/23599#issuecomment-1784639173 Hi @pnowojski @RocMarshal , would you mind helping take a look this PR in your free time? Thanks a lot -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects [flink]
flinkbot commented on PR #23599: URL: https://github.com/apache/flink/pull/23599#issuecomment-1780625586 ## CI report: * 2cd1a297d97a52d9778a430d1f9d887aff9bab63 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org