[ 
https://issues.apache.org/jira/browse/KAFKA-18852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929530#comment-17929530
 ] 

Raju Gupta commented on KAFKA-18852:
------------------------------------

[~ewencp] Would love to know your thoughts on this.

> ApiVersions should use Concurrent Collections instead of sychronised
> --------------------------------------------------------------------
>
>                 Key: KAFKA-18852
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18852
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Raju Gupta
>            Priority: Minor
>         Attachments: KAFKA-18552.patch
>
>
> h2. Analysis of Changes in ApiVersions Class
> The *ApiVersions* class has been updated to enhance *thread safety* and 
> *performance* using concurrent data structures.  
> h3. Key Improvements  
> * *ConcurrentHashMap for nodeApiVersions*  
>   * *Before*: Used `HashMap` with synchronized access.  
>   * *After*: Replaced with `ConcurrentHashMap`, eliminating explicit 
> synchronization.  
>   * *Benefits*: Improved performance (concurrent reads/writes), reduced 
> complexity.  
> * *AtomicLong for maxFinalizedFeaturesEpoch*  
>   * *Before*: `long` with synchronized updates.  
>   * *After*: `AtomicLong` ensures atomic updates without locks.  
>   * *Benefits*: Faster updates using CPU-level atomic operations.  
> * *Removed synchronized Blocks*  
>   * *Impact*: Reduces lock contention, improving scalability.  
>   * *Consideration*: Future modifications must ensure correct usage of 
> concurrent structures.  
> * *Handling finalizedFeatures*  
>   * *Issue*: Still uses `Map<String, Short>`, leading to potential race 
> conditions.  
>   * *Fix*: Replaced with `AtomicReference<Map<String, Short>>` for atomic 
> updates.  
> h3. Updated Code  
> {code:java}
> private final Map<String, NodeApiVersions> nodeApiVersions = new 
> ConcurrentHashMap<>();
> private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
> private final AtomicReference<Map<String, Short>> finalizedFeatures = new 
> AtomicReference<>(new ConcurrentHashMap<>());
> public void update(String nodeId, NodeApiVersions nodeApiVersions) {
>     this.nodeApiVersions.put(nodeId, nodeApiVersions);
>     maxFinalizedFeaturesEpoch.updateAndGet(prev -> 
>         Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
>     this.finalizedFeatures.set(new 
> ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
> }
> public NodeApiVersions get(String nodeId) {
>     return this.nodeApiVersions.get();
> }
> public long getMaxFinalizedFeaturesEpoch() {
>     return maxFinalizedFeaturesEpoch.get();
> }
> public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
>     return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(), 
> finalizedFeatures.get());
> }
> {code}
> h3. Summary  
> * *Thread Safety*: Ensured with `ConcurrentHashMap` and `AtomicLong`.  
> * *Performance*: Reduced locks improve multi-threaded efficiency.  
> * *Scalability*: Supports high concurrency better.  
> * *Next Steps*: Test for edge cases and update documentation.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to