RaphaelFakhri opened a new pull request, #19531: URL: https://github.com/apache/kafka/pull/19531
## Problem The `kafka.utils.Pool` class is a thin Scala wrapper around `java.util.concurrent.ConcurrentHashMap`. While it provides a convenient `getAndMaybePut` method, the underlying functionality is largely replicated by `ConcurrentHashMap`'s standard methods, particularly `computeIfAbsent`. ## Analysis / Motivation As discussed in [KAFKA-19183](https://issues.apache.org/jira/browse/KAFKA-19183), the `Pool` class adds an unnecessary layer of abstraction. The primary addition over raw `ConcurrentHashMap` is the `getAndMaybePut` method, which can be effectively replaced with `ConcurrentHashMap.computeIfAbsent`. While `Pool` allows defining a default value factory at construction, analysis showed this feature is only used in a limited number of locations. These usages can be easily refactored to pass the value creation logic explicitly to `computeIfAbsent` at the call site. Removing the `Pool` class: - Simplifies the codebase - Reduces maintenance overhead - Promotes the use of standard, well-understood Java concurrent collections ## Changes ### Removal of Files - `kafka/utils/Pool.scala` — **Removed** - `kafka/utils/PoolTest.scala` — **Removed** (tested the now-removed `Pool` class) ### Refactoring of Usages - All instances of `kafka.utils.Pool` have been replaced with `java.util.concurrent.ConcurrentHashMap`. - Calls to `Pool.getAndMaybePut(key, createValue)` have been replaced with: ```scala concurrentHashMap.computeIfAbsent(key, _ => createValue) ``` - Calls to `Pool.getAndMaybePut(key)` (which used the constructor factory) have been replaced with: ```scala concurrentHashMap.computeIfAbsent(key, k => valueFactory(k)) ``` where `valueFactory` is now locally defined or accessible in scope. - Calls accessing collection views (`.keys`, `.values`, `.iterator`) have been updated to use Java API methods (`.keySet()`, `.values()`, `.entrySet().iterator()`) followed by `.asScala` from `scala.jdk.CollectionConverters` to maintain the Scala collection interface where necessary. - Other minor method name updates: - `.size` → `.size()` - `.putIfNotExists` → `.putIfAbsent` - The `addLoadedTransactionsToCache` method signature in `TransactionStateManager` has been updated to accept `ConcurrentHashMap` instead of `Pool`. ## Testing - `PoolTest.scala` was removed. - Existing unit and integration tests for the components that previously used `Pool` (e.g., `FetcherLagStats`, `TransactionStateManager`, `ReplicaManager`) provide sufficient coverage for the refactored code paths. - No new tests were added, as the core concurrent map functionality is provided by the JDK. ## Compatibility - This change is **internal refactoring** and does not affect any public APIs or external compatibility. ## Checklist - [x] Code follows the coding style. - [ ] Documentation is updated. (Not applicable for this internal refactoring) - [x] Unit tests are updated/added (Removed `PoolTest`, relying on existing component tests) - [ ] Integration tests are updated/added. (Not applicable, existing tests cover functionality) - [ ] KIP has been created or exists for this change. (KAFKA-19183 serves as the discussion/motivation) - [ ] Release notes are updated. (Not applicable for this internal refactoring) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org