RaphaelFakhri opened a new pull request, #19531:
URL: https://github.com/apache/kafka/pull/19531

   ## Problem
   
   The `kafka.utils.Pool` class is a thin Scala wrapper around 
`java.util.concurrent.ConcurrentHashMap`. While it provides a convenient 
`getAndMaybePut` method, the underlying functionality is largely replicated by 
`ConcurrentHashMap`'s standard methods, particularly `computeIfAbsent`.
   
   ## Analysis / Motivation
   
   As discussed in 
[KAFKA-19183](https://issues.apache.org/jira/browse/KAFKA-19183), the `Pool` 
class adds an unnecessary layer of abstraction. The primary addition over raw 
`ConcurrentHashMap` is the `getAndMaybePut` method, which can be effectively 
replaced with `ConcurrentHashMap.computeIfAbsent`.
   
   While `Pool` allows defining a default value factory at construction, 
analysis showed this feature is only used in a limited number of locations. 
These usages can be easily refactored to pass the value creation logic 
explicitly to `computeIfAbsent` at the call site.
   
   Removing the `Pool` class:
   - Simplifies the codebase
   - Reduces maintenance overhead
   - Promotes the use of standard, well-understood Java concurrent collections
   
   ## Changes
   
   ### Removal of Files
   
   - `kafka/utils/Pool.scala` — **Removed**
   - `kafka/utils/PoolTest.scala` — **Removed** (tested the now-removed `Pool` 
class)
   
   ### Refactoring of Usages
   
   - All instances of `kafka.utils.Pool` have been replaced with 
`java.util.concurrent.ConcurrentHashMap`.
   - Calls to `Pool.getAndMaybePut(key, createValue)` have been replaced with:
     ```scala
     concurrentHashMap.computeIfAbsent(key, _ => createValue)
     ```
   - Calls to `Pool.getAndMaybePut(key)` (which used the constructor factory) 
have been replaced with:
     ```scala
     concurrentHashMap.computeIfAbsent(key, k => valueFactory(k))
     ```
     where `valueFactory` is now locally defined or accessible in scope.
   
   - Calls accessing collection views (`.keys`, `.values`, `.iterator`) have 
been updated to use Java API methods (`.keySet()`, `.values()`, 
`.entrySet().iterator()`) followed by `.asScala` from 
`scala.jdk.CollectionConverters` to maintain the Scala collection interface 
where necessary.
   
   - Other minor method name updates:
     - `.size` → `.size()`
     - `.putIfNotExists` → `.putIfAbsent`
   
   - The `addLoadedTransactionsToCache` method signature in 
`TransactionStateManager` has been updated to accept `ConcurrentHashMap` 
instead of `Pool`.
   
   ## Testing
   
   - `PoolTest.scala` was removed.
   - Existing unit and integration tests for the components that previously 
used `Pool` (e.g., `FetcherLagStats`, `TransactionStateManager`, 
`ReplicaManager`) provide sufficient coverage for the refactored code paths.
   - No new tests were added, as the core concurrent map functionality is 
provided by the JDK.
   
   ## Compatibility
   
   - This change is **internal refactoring** and does not affect any public 
APIs or external compatibility.
   
   ## Checklist
   
   - [x] Code follows the coding style.
   - [ ] Documentation is updated. (Not applicable for this internal 
refactoring)
   - [x] Unit tests are updated/added (Removed `PoolTest`, relying on existing 
component tests)
   - [ ] Integration tests are updated/added. (Not applicable, existing tests 
cover functionality)
   - [ ] KIP has been created or exists for this change. (KAFKA-19183 serves as 
the discussion/motivation)
   - [ ] Release notes are updated. (Not applicable for this internal 
refactoring)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to