This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 89e4d46f4a5585ac126a03aa298020203100ad71
Author: lifepuzzlefun <wjl_is_...@163.com>
AuthorDate: Sun Apr 23 11:27:48 2023 +0800

    [fix] [broker] Make `LeastResourceUsageWithWeight` thread safe (#20159)
    
    Fix #20160
    
    ### Motivation
    LeastResourceUsageWithWeight is an stateful object and current code will be 
accessed by multithread.
    thread 1: is execute 
https://github.com/apache/pulsar/blob/2b41e4eafb1cba0e548dd90df60e8cdbb24cd490/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java#L89-L91
    and thread 2: is execute 
https://github.com/apache/pulsar/blob/2b41e4eafb1cba0e548dd90df60e8cdbb24cd490/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java#L147-L150
    so an IndexOutOfBound occurs.
    
    ### Modifications
    
    change the state to `ThreadLocal`
    
    (cherry picked from commit 963260abfa142b8cf9ffe372d85d470a78c17235)
---
 .../strategy/LeastResourceUsageWithWeight.java         | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
index 902cfdaf73f..98986d84b98 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.concurrent.ThreadSafe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
@@ -35,14 +36,15 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
  * cause cluster fluctuations due to short-term load jitter.
  */
 @Slf4j
+@ThreadSafe
 public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
     // Maintain this list to reduce object creation.
-    private final ArrayList<String> bestBrokers;
-    private final Set<String> noLoadDataBrokers;
+    private final ThreadLocal<ArrayList<String>> bestBrokers;
+    private final ThreadLocal<HashSet<String>> noLoadDataBrokers;
 
     public LeastResourceUsageWithWeight() {
-        this.bestBrokers = new ArrayList<>();
-        this.noLoadDataBrokers = new HashSet<>();
+        this.bestBrokers = ThreadLocal.withInitial(ArrayList::new);
+        this.noLoadDataBrokers = ThreadLocal.withInitial(HashSet::new);
     }
 
     // A broker's max resource usage with weight using its historical load and 
short-term load data with weight.
@@ -70,7 +72,6 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
 
     /**
      * Find a suitable broker to assign the given bundle to.
-     * This method is not thread safety.
      *
      * @param candidates     The candidates for which the bundle may be 
assigned.
      * @param bundleToAssign The data for the bundle to assign.
@@ -86,6 +87,9 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
             return Optional.empty();
         }
 
+        ArrayList<String> bestBrokers = this.bestBrokers.get();
+        HashSet<String> noLoadDataBrokers = this.noLoadDataBrokers.get();
+
         bestBrokers.clear();
         noLoadDataBrokers.clear();
         // Maintain of list of all the best scoring brokers and then randomly
@@ -135,9 +139,7 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
                 log.info("Assign randomly as none of the brokers are 
underloaded. candidatesSize:{}, "
                         + "noLoadDataBrokersSize:{}", candidates.size(), 
noLoadDataBrokers.size());
             }
-            for (String broker : candidates) {
-                bestBrokers.add(broker);
-            }
+            bestBrokers.addAll(candidates);
         }
 
         if (debugMode) {

Reply via email to