[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.
StefanRRichter commented on code in PR #22788: URL: https://github.com/apache/flink/pull/22788#discussion_r1235435483 ## flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java: ## @@ -133,4 +140,71 @@ public static Map map(Map.Entry... entries) { } return Collections.unmodifiableMap(map); } + +/** + * Creates a new {@link HashMap} of the expected size, i.e. a hash map that will not rehash if + * expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static HashMap newHashMapWithExpectedSize(int expectedSize) { +return new HashMap<>(computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash map that will not + * rehash if expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static LinkedHashMap newLinkedHashMapWithExpectedSize(int expectedSize) { +return new LinkedHashMap<>( +computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link HashSet} of the expected size, i.e. a hash set that will not rehash if + * expectedSize many unique elements are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of elements stored by this set. + */ +public static HashSet newHashSetWithExpectedSize(int expectedSize) { +return new HashSet<>(computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link LinkedHashSet} of the expected size, i.e. a hash set that will not + * rehash if expectedSize many unique elements are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of elements stored by this set. + */ +public static LinkedHashSet newLinkedHashSetWithExpectedSize(int expectedSize) { +return new LinkedHashSet<>( +computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Helper method to compute the right capacity for a hash map with load factor + * HASH_MAP_DEFAULT_LOAD_FACTOR. + */ +@VisibleForTesting +static int computeRequiredCapacity(int expectedSize) { Review Comment: Disadvantages, not really. Slightly more code and more parameters being passed around. Maybe some compiler plugin telling you that the parameter is always the same value. But I'd ask back: are there any advantages in the suggested change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.
StefanRRichter commented on code in PR #22788: URL: https://github.com/apache/flink/pull/22788#discussion_r1235432295 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java: ## @@ -90,27 +80,62 @@ private void downloadDataForAllStateHandles( } else { throw new FlinkRuntimeException("Failed to download data for state handles.", e); } +} finally { +// Unregister and close the internal closer. In a failure case, this should interrupt +// ongoing downloads. +closeableRegistry.unregisterCloseable(internalCloser); +IOUtils.closeQuietly(internalCloser); +if (failureCleanupRequired) { +// Cleanup on exception: cancel all tasks and delete the created directories +futures.forEach(future -> future.cancel(true)); +downloadRequests.stream() +.map(StateHandleDownloadSpec::getDownloadDestination) +.map(Path::toFile) +.forEach(FileUtils::deleteDirectoryQuietly); +} Review Comment: About 1, we'd have to ask the original author, but the code calls `ExceptionUtils.stripExecutionException(e);` , so it's expecting to work on execution exceptions. Not that it couldn't be changed... And for 2, yes that can be done, but is it actually better or more readable? I think we are debating about personal taste here, except if you have a point while one should be better than the other way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.
StefanRRichter commented on code in PR #22788: URL: https://github.com/apache/flink/pull/22788#discussion_r1235425320 ## flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java: ## @@ -133,4 +140,71 @@ public static Map map(Map.Entry... entries) { } return Collections.unmodifiableMap(map); } + +/** + * Creates a new {@link HashMap} of the expected size, i.e. a hash map that will not rehash if + * expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static HashMap newHashMapWithExpectedSize(int expectedSize) { +return new HashMap<>(computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash map that will not + * rehash if expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static LinkedHashMap newLinkedHashMapWithExpectedSize(int expectedSize) { +return new LinkedHashMap<>( +computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); Review Comment: Yes, that was the plan because every place that initializes a map with the end size to avoid rehashing is actually causing a guaranteed rehash to happen today. But I'd do that in a separate refactoring PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.
StefanRRichter commented on code in PR #22788: URL: https://github.com/apache/flink/pull/22788#discussion_r1235422689 ## flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java: ## @@ -133,4 +140,71 @@ public static Map map(Map.Entry... entries) { } return Collections.unmodifiableMap(map); } + +/** + * Creates a new {@link HashMap} of the expected size, i.e. a hash map that will not rehash if + * expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static HashMap newHashMapWithExpectedSize(int expectedSize) { +return new HashMap<>(computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash map that will not + * rehash if expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static LinkedHashMap newLinkedHashMapWithExpectedSize(int expectedSize) { +return new LinkedHashMap<>( +computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link HashSet} of the expected size, i.e. a hash set that will not rehash if + * expectedSize many unique elements are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of elements stored by this set. + */ +public static HashSet newHashSetWithExpectedSize(int expectedSize) { +return new HashSet<>(computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link LinkedHashSet} of the expected size, i.e. a hash set that will not + * rehash if expectedSize many unique elements are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of elements stored by this set. + */ +public static LinkedHashSet newLinkedHashSetWithExpectedSize(int expectedSize) { +return new LinkedHashSet<>( +computeRequiredCapacity(expectedSize), HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Helper method to compute the right capacity for a hash map with load factor + * HASH_MAP_DEFAULT_LOAD_FACTOR. + */ +@VisibleForTesting +static int computeRequiredCapacity(int expectedSize) { Review Comment: It's a private method and would always be called for the same value. And the static load factor is also passed into the map constructors, so I don't fully understand that part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.
StefanRRichter commented on code in PR #22788: URL: https://github.com/apache/flink/pull/22788#discussion_r1234916685 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java: ## @@ -90,27 +80,62 @@ private void downloadDataForAllStateHandles( } else { throw new FlinkRuntimeException("Failed to download data for state handles.", e); } +} finally { +// Unregister and close the internal closer. In a failure case, this should interrupt +// ongoing downloads. +closeableRegistry.unregisterCloseable(internalCloser); +IOUtils.closeQuietly(internalCloser); +if (failureCleanupRequired) { +// Cleanup on exception: cancel all tasks and delete the created directories +futures.forEach(future -> future.cancel(true)); +downloadRequests.stream() +.map(StateHandleDownloadSpec::getDownloadDestination) +.map(Path::toFile) +.forEach(FileUtils::deleteDirectoryQuietly); +} Review Comment: Two reasons: - the code was written to catch only execution exception, but I want to make sure the cleanup happens also for any other exception. - For cleanup, I first want to close the registry to interrupt the IO in all parallel tasks, but I also want to close the registry in the finally block, so it would have slightly more code duplication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org