[ https://issues.apache.org/jira/browse/KAFKA-18852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929559#comment-17929559 ]
Ewen Cheslack-Postava commented on KAFKA-18852: ----------------------------------------------- [~rajugupta] I haven't been active on Kafka for at least 3 years, and I see there's at least one commit on this code relatively recently (about 5mo ago). You probably need someone more up-to-date on this (and broader code) to really review this. Especially since the commit is for KIP-890 around transactions, which I never worked on. Looks like jolshan was the committer for that. That said, at first glance the updated version doesn't look equivalent and looks like it probably has race conditions. The accesses to individual data fields ({{{}nodeApiVersions{}}}, {{{}maxFinalizedFeaturesEpoch{}}}, and {{{}finalizedFeatures{}}}) all need to be kept consistent with each other. Writes in {{update()}} and the separate reads ({{{}get(){}}} calls) in {{getFinalizedFeaturesInfo()}} look like they could be interleaved and cause inconsistent results. This seems more complex (using {{{}synchronized{}}}, or even making the lock more explicit if you want to avoid the default {{Object}} synchronization is basically the simplest, easiest to reason about with respect to concurrency, including consistency and correctness issues. Was there anything in particular that motivated this? It doesn't seem like this should be a particularly hot set of code paths, so I wouldn't have expected lock contention to be an issue. But as I said, I'm out of date. Regardless, it'd help to include the motivation since making synchronization more granular is always risky wrt correctness. > ApiVersions should use Concurrent Collections instead of sychronised > -------------------------------------------------------------------- > > Key: KAFKA-18852 > URL: https://issues.apache.org/jira/browse/KAFKA-18852 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Raju Gupta > Priority: Minor > Attachments: KAFKA-18552.patch > > > h2. Analysis of Changes in ApiVersions Class > The *ApiVersions* class has been updated to enhance *thread safety* and > *performance* using concurrent data structures. > h3. Key Improvements > * *ConcurrentHashMap for nodeApiVersions* > * *Before*: Used `HashMap` with synchronized access. > * *After*: Replaced with `ConcurrentHashMap`, eliminating explicit > synchronization. > * *Benefits*: Improved performance (concurrent reads/writes), reduced > complexity. > * *AtomicLong for maxFinalizedFeaturesEpoch* > * *Before*: `long` with synchronized updates. > * *After*: `AtomicLong` ensures atomic updates without locks. > * *Benefits*: Faster updates using CPU-level atomic operations. > * *Removed synchronized Blocks* > * *Impact*: Reduces lock contention, improving scalability. > * *Consideration*: Future modifications must ensure correct usage of > concurrent structures. > * *Handling finalizedFeatures* > * *Issue*: Still uses `Map<String, Short>`, leading to potential race > conditions. > * *Fix*: Replaced with `AtomicReference<Map<String, Short>>` for atomic > updates. > h3. Updated Code > {code:java} > private final Map<String, NodeApiVersions> nodeApiVersions = new > ConcurrentHashMap<>(); > private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1); > private final AtomicReference<Map<String, Short>> finalizedFeatures = new > AtomicReference<>(new ConcurrentHashMap<>()); > public void update(String nodeId, NodeApiVersions nodeApiVersions) { > this.nodeApiVersions.put(nodeId, nodeApiVersions); > maxFinalizedFeaturesEpoch.updateAndGet(prev -> > Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch())); > this.finalizedFeatures.set(new > ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures())); > } > public NodeApiVersions get(String nodeId) { > return this.nodeApiVersions.get(); > } > public long getMaxFinalizedFeaturesEpoch() { > return maxFinalizedFeaturesEpoch.get(); > } > public FinalizedFeaturesInfo getFinalizedFeaturesInfo() { > return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(), > finalizedFeatures.get()); > } > {code} > h3. Summary > * *Thread Safety*: Ensured with `ConcurrentHashMap` and `AtomicLong`. > * *Performance*: Reduced locks improve multi-threaded efficiency. > * *Scalability*: Supports high concurrency better. > * *Next Steps*: Test for edge cases and update documentation. -- This message was sent by Atlassian Jira (v8.20.10#820010)