[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.

2023-06-20 Thread via GitHub


StefanRRichter commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1235435483


##
flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java:
##
@@ -133,4 +140,71 @@ public static  Map map(Map.Entry... 
entries) {
 }
 return Collections.unmodifiableMap(map);
 }
+
+/**
+ * Creates a new {@link HashMap} of the expected size, i.e. a hash map 
that will not rehash if
+ * expectedSize many keys are inserted, considering the load factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of keys maintained by this map.
+ * @param  the type of mapped values.
+ */
+public static  HashMap newHashMapWithExpectedSize(int 
expectedSize) {
+return new HashMap<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash 
map that will not
+ * rehash if expectedSize many keys are inserted, considering the load 
factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of keys maintained by this map.
+ * @param  the type of mapped values.
+ */
+public static  LinkedHashMap 
newLinkedHashMapWithExpectedSize(int expectedSize) {
+return new LinkedHashMap<>(
+computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Creates a new {@link HashSet} of the expected size, i.e. a hash set 
that will not rehash if
+ * expectedSize many unique elements are inserted, considering the load 
factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of elements stored by this set.
+ */
+public static  HashSet newHashSetWithExpectedSize(int expectedSize) {
+return new HashSet<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Creates a new {@link LinkedHashSet} of the expected size, i.e. a hash 
set that will not
+ * rehash if expectedSize many unique elements are inserted, considering 
the load factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of elements stored by this set.
+ */
+public static  LinkedHashSet newLinkedHashSetWithExpectedSize(int 
expectedSize) {
+return new LinkedHashSet<>(
+computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Helper method to compute the right capacity for a hash map with load 
factor
+ * HASH_MAP_DEFAULT_LOAD_FACTOR.
+ */
+@VisibleForTesting
+static int computeRequiredCapacity(int expectedSize) {

Review Comment:
   Disadvantages, not really. Slightly more code and more parameters being 
passed around. Maybe some compiler plugin telling you that the parameter is 
always the same value.
   
But I'd ask back: are there any advantages in the suggested change? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.

2023-06-20 Thread via GitHub


StefanRRichter commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1235432295


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##
@@ -90,27 +80,62 @@ private void downloadDataForAllStateHandles(
 } else {
 throw new FlinkRuntimeException("Failed to download data for 
state handles.", e);
 }
+} finally {
+// Unregister and close the internal closer. In a failure case, 
this should interrupt
+// ongoing downloads.
+closeableRegistry.unregisterCloseable(internalCloser);
+IOUtils.closeQuietly(internalCloser);
+if (failureCleanupRequired) {
+// Cleanup on exception: cancel all tasks and delete the 
created directories
+futures.forEach(future -> future.cancel(true));
+downloadRequests.stream()
+.map(StateHandleDownloadSpec::getDownloadDestination)
+.map(Path::toFile)
+.forEach(FileUtils::deleteDirectoryQuietly);
+}

Review Comment:
   About 1, we'd have to ask the original author, but the code calls 
`ExceptionUtils.stripExecutionException(e);` , so it's expecting to work on 
execution exceptions. Not that it couldn't be changed...
   
   And for 2, yes that can be done, but is it actually better or more readable? 
I think we are debating about personal taste here, except if you have a point 
while one should be better than the other way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.

2023-06-20 Thread via GitHub


StefanRRichter commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1235425320


##
flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java:
##
@@ -133,4 +140,71 @@ public static  Map map(Map.Entry... 
entries) {
 }
 return Collections.unmodifiableMap(map);
 }
+
+/**
+ * Creates a new {@link HashMap} of the expected size, i.e. a hash map 
that will not rehash if
+ * expectedSize many keys are inserted, considering the load factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of keys maintained by this map.
+ * @param  the type of mapped values.
+ */
+public static  HashMap newHashMapWithExpectedSize(int 
expectedSize) {
+return new HashMap<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash 
map that will not
+ * rehash if expectedSize many keys are inserted, considering the load 
factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of keys maintained by this map.
+ * @param  the type of mapped values.
+ */
+public static  LinkedHashMap 
newLinkedHashMapWithExpectedSize(int expectedSize) {
+return new LinkedHashMap<>(
+computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);

Review Comment:
   Yes, that was the plan because every place that initializes a map with the 
end size to avoid rehashing is actually causing a guaranteed rehash to happen 
today. But I'd do that in a separate refactoring PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.

2023-06-20 Thread via GitHub


StefanRRichter commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1235422689


##
flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java:
##
@@ -133,4 +140,71 @@ public static  Map map(Map.Entry... 
entries) {
 }
 return Collections.unmodifiableMap(map);
 }
+
+/**
+ * Creates a new {@link HashMap} of the expected size, i.e. a hash map 
that will not rehash if
+ * expectedSize many keys are inserted, considering the load factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of keys maintained by this map.
+ * @param  the type of mapped values.
+ */
+public static  HashMap newHashMapWithExpectedSize(int 
expectedSize) {
+return new HashMap<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash 
map that will not
+ * rehash if expectedSize many keys are inserted, considering the load 
factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of keys maintained by this map.
+ * @param  the type of mapped values.
+ */
+public static  LinkedHashMap 
newLinkedHashMapWithExpectedSize(int expectedSize) {
+return new LinkedHashMap<>(
+computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Creates a new {@link HashSet} of the expected size, i.e. a hash set 
that will not rehash if
+ * expectedSize many unique elements are inserted, considering the load 
factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of elements stored by this set.
+ */
+public static  HashSet newHashSetWithExpectedSize(int expectedSize) {
+return new HashSet<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Creates a new {@link LinkedHashSet} of the expected size, i.e. a hash 
set that will not
+ * rehash if expectedSize many unique elements are inserted, considering 
the load factor.
+ *
+ * @param expectedSize the expected size of the created hash map.
+ * @return a new hash map instance with enough capacity for the expected 
size.
+ * @param  the type of elements stored by this set.
+ */
+public static  LinkedHashSet newLinkedHashSetWithExpectedSize(int 
expectedSize) {
+return new LinkedHashSet<>(
+computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+}
+
+/**
+ * Helper method to compute the right capacity for a hash map with load 
factor
+ * HASH_MAP_DEFAULT_LOAD_FACTOR.
+ */
+@VisibleForTesting
+static int computeRequiredCapacity(int expectedSize) {

Review Comment:
   It's a private method and would always be called for the same value. And the 
static load factor is also passed into the map constructors, so I don't fully 
understand that part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] StefanRRichter commented on a diff in pull request #22788: [FLINK-32345][state] Improve parallel download of RocksDB incremental state.

2023-06-20 Thread via GitHub


StefanRRichter commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1234916685


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##
@@ -90,27 +80,62 @@ private void downloadDataForAllStateHandles(
 } else {
 throw new FlinkRuntimeException("Failed to download data for 
state handles.", e);
 }
+} finally {
+// Unregister and close the internal closer. In a failure case, 
this should interrupt
+// ongoing downloads.
+closeableRegistry.unregisterCloseable(internalCloser);
+IOUtils.closeQuietly(internalCloser);
+if (failureCleanupRequired) {
+// Cleanup on exception: cancel all tasks and delete the 
created directories
+futures.forEach(future -> future.cancel(true));
+downloadRequests.stream()
+.map(StateHandleDownloadSpec::getDownloadDestination)
+.map(Path::toFile)
+.forEach(FileUtils::deleteDirectoryQuietly);
+}

Review Comment:
   Two reasons:
   - the code was written to catch only execution exception, but I want to make 
sure the cleanup happens also for any other exception.
   - For cleanup, I first want to close the registry to interrupt the IO in all 
parallel tasks, but I also want to close the registry in the finally block, so 
it would have slightly more code duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org