[ 
https://issues.apache.org/jira/browse/KAFKA-7019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511899#comment-16511899
 ] 

ASF GitHub Bot commented on KAFKA-7019:
---------------------------------------

radai-rosenblatt opened a new pull request #5221: KAFKA-7019 - make reading 
metadata lock-free by maintaining an atomically-updated read snapshot
URL: https://github.com/apache/kafka/pull/5221
 
 
   we've seen cases where under a constant metadata update load (automated 
partition rebalance) request handling threads can block for a significant 
amount of time on the metadata lock.
   
   the reason is that for large enough clusters (~200,000 topic partitions) a 
read operation can actually take a long while to compose a response. under a 
constant stream of reads + writes we see situations where a reader is currently 
in, a writer is pending (blocked) and then a big pile-up of more readers that 
are blocked behind the pending writer.
   
   this patch makes the read path lock-free. the metadata is now stored in a 
logically-immutable snapshot. all read operations grab a snapshot and serve 
data out of it. write paths create an entirely new snapshot and atomically 
assign it. writers are still under a lock, for mutual exclusion.
   
   here's the benchmark code i used to measure the effects of this patch:
   ```java
   public class MetadataCacheBenchmark {
       private volatile boolean running = true;
   
       int numBrokers = 150;
       int numTopics = 3500;
       int maxPartitionsPerTopic = 100;
       int replicationFactor = 2;
       int numUpdaters = 1;
       double updateRateLimit = 10.0; //qps
       int numReaders = 5;
       boolean partialUpdate = true;
   
       private final ListenerName listener = new ListenerName("listener");
   
       private final AtomicLong updateCounter = new AtomicLong();
       private final AtomicLong readCounter = new AtomicLong();
   
       @Test
       public void benchmarkAllTheThings() throws Exception {
           //long seed = System.currentTimeMillis();
           long seed = 666;
           System.err.println("seed is " + seed);
           Random r = new Random(seed);
   
           MetadataCache cache = new MetadataCache(666);
           UpdateMetadataRequest fullRequest = buildRequest(r, -1);
           UpdateMetadataRequest partialRequest = buildRequest(r, 1);
   
           cache.updateCache(0, fullRequest); //initial data (useful in case 
there are no writers)
   
           Set<String> allTopics = new HashSet<>();
           for (int i = 0; i < numTopics; i++) {
               allTopics.add("topic-" + i);
           }
           scala.collection.mutable.Set<String> topicsScalaSet = 
JavaConverters.asScalaSetConverter(allTopics).asScala();
   
           Thread.UncaughtExceptionHandler exceptionHandler = new 
Thread.UncaughtExceptionHandler() {
               @Override
               public void uncaughtException(Thread t, Throwable e) {
                   running = false;
                   System.err.println("thread " + t + " died");
                   e.printStackTrace(System.err);
                   System.exit(1);
               }
           };
           List<Thread> threads = new ArrayList<>();
   
           for (int i = 0; i < numUpdaters; i++) {
               UpdateMetadataRequest req = partialUpdate ? partialRequest : 
fullRequest;
   
               Runnable updaterRunnable;
               if (updateRateLimit > 0) {
                   updaterRunnable = new 
RateLimitedUpdateRunnable(updateRateLimit, cache, req);
               } else {
                   updaterRunnable = new UpdateRunnable(cache, req);
               }
               Thread updaterThread = new Thread(updaterRunnable, "updater-" + 
i);
               updaterThread.setDaemon(true);
               updaterThread.setUncaughtExceptionHandler(exceptionHandler);
               threads.add(updaterThread);
           }
   
           for (int i = 0; i < numReaders; i++) {
               ReadRunnable readRunnable = new ReadRunnable(cache, 
topicsScalaSet);
               Thread readerThread = new Thread(readRunnable, "reader-" + i);
               readerThread.setDaemon(true);
               readerThread.setUncaughtExceptionHandler(exceptionHandler);
               threads.add(readerThread);
           }
   
           for (Thread t : threads) {
               t.start();
           }
   
           long prevTime = System.currentTimeMillis();
           long prevUpdates = 0;
           long prevReads = 0;
   
           long now;
           long updates;
           long reads;
   
           long timeDiff;
           long updateDiff;
           long readDiff;
   
           double updateQps;
           double readQps;
   
           while (running) {
               Thread.sleep(TimeUnit.SECONDS.toMillis(30));
               now = System.currentTimeMillis();
               updates = updateCounter.longValue();
               reads = readCounter.longValue();
   
               timeDiff = now - prevTime;
               updateDiff = updates - prevUpdates;
               readDiff = reads - prevReads;
   
               updateQps = ((double) updateDiff * 1000) / timeDiff;
               readQps = ((double) readDiff * 1000) / timeDiff;
   
               prevTime = now;
               prevUpdates = updates;
               prevReads = reads;
   
               System.err.println("updates: " + updateQps + " / sec");
               System.err.println("reads: " + readQps + " / sec");
           }
       }
   
       private UpdateMetadataRequest buildRequest(Random random, int 
numTopicsOverride) {
           int controllerEpoch = 0;
           int totalPartitions = 0;
           Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>();
   
           for (int i = 0; i < numBrokers; i++) {
               UpdateMetadataRequest.EndPoint endPoint =
                   new UpdateMetadataRequest.EndPoint("host-" + i, 6666, 
SecurityProtocol.PLAINTEXT, listener);
               UpdateMetadataRequest.Broker broker =
                   new UpdateMetadataRequest.Broker(i, 
Collections.singletonList(endPoint), "rack-" + i);
               liveBrokers.add(broker);
           }
   
           Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitions 
= new HashMap<>();
   
           int topicCount = numTopicsOverride > 0 ? numTopicsOverride : 
numTopics;
   
           for (int i = 0; i < topicCount; i++) {
               String topicName = "topic-" + i;
               int numPartitions = 1 + random.nextInt(maxPartitionsPerTopic);
               for (int j = 0; j < numPartitions; j++) {
                   TopicPartition tp = new TopicPartition(topicName, j);
                   List<Integer> replicas = pick(replicationFactor, numBrokers, 
random);
                   UpdateMetadataRequest.PartitionState state =
                       new 
UpdateMetadataRequest.PartitionState(controllerEpoch, replicas.get(0), 0, 
replicas, 0, replicas,
                           Collections.<Integer>emptyList());
                   partitions.put(tp, state);
               }
               totalPartitions += numPartitions;
           }
   
           UpdateMetadataRequest.Builder builder =
               new UpdateMetadataRequest.Builder((short) 4, 0, controllerEpoch, 
partitions, liveBrokers);
   
           UpdateMetadataRequest request = builder.build((short) 4);
           System.err.println("request has " + totalPartitions + " TPs total");
           return request;
       }
   
       private List<Integer> pick(int howMany, int from, Random random) {
           List<Integer> result = new ArrayList<>(howMany);
           while (result.size() < howMany) {
               int chosen = random.nextInt(from); //exclusive
               if (!result.contains(chosen)) {
                   result.add(chosen);
               }
           }
           return result;
       }
   
       private class UpdateRunnable implements Runnable {
           private final MetadataCache cache;
           private final UpdateMetadataRequest request;
           private int counter = 0;
   
           public UpdateRunnable(MetadataCache cache, UpdateMetadataRequest 
request) {
               this.cache = cache;
               this.request = request;
           }
   
           @Override
           public void run() {
               while (running) {
                   cache.updateCache(counter++, request);
                   updateCounter.incrementAndGet();
               }
           }
       }
   
       private class RateLimitedUpdateRunnable implements Runnable {
           private final MetadataCache cache;
           private final UpdateMetadataRequest request;
           private int counter = 0;
   
           private final double targetQps;
           private final int intervalMillis;
   
           public RateLimitedUpdateRunnable(double targetQps, MetadataCache 
cache, UpdateMetadataRequest request) {
               this.cache = cache;
               this.request = request;
               this.targetQps = targetQps;
               this.intervalMillis = (int) (1000.0 / targetQps);
           }
   
           @Override
           public void run() {
               while (running) {
                   long start = System.currentTimeMillis();
                   cache.updateCache(counter++, request);
                   long end = System.currentTimeMillis();
                   updateCounter.incrementAndGet();
                   long took = end - start;
                   long remaining = intervalMillis - took;
                   if (remaining > 0) {
                       try {
                           Thread.sleep(remaining);
                       } catch (Exception e) {
                           e.printStackTrace(System.err);
                       }
                   }
               }
           }
       }
   
       private class ReadRunnable implements Runnable {
           private final MetadataCache cache;
           private final scala.collection.mutable.Set<String> topics;
   
           public ReadRunnable(MetadataCache cache, 
scala.collection.mutable.Set<String> topics) {
               this.cache = cache;
               this.topics = topics;
           }
   
           @Override
           public void run() {
               while (running) {
                   cache.getTopicMetadata(topics, listener, false);
                   readCounter.incrementAndGet();
               }
           }
       }
   }
   ```
   
   the interesting/problematic scenario is a combination of writers and readers,

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Reduction the contention between metadata update and metadata read operation
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-7019
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7019
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Dong Lin
>            Assignee: Radai Rosenblatt
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Currently MetadataCache.updateCache() grabs a write lock in order to process 
> the UpdateMetadataRequest from controller. And a read lock is needed in order 
> to handle the MetadataRequest from clients. Thus the handling of 
> MetadataRequest and UpdateMetadataRequest blocks each other and the broker 
> can only process such request at a time even if there are multiple request 
> handler threads. Note that broker can not process MetadataRequest in parallel 
> if there is a UpdateMetadataRequest waiting for the write lock, even if 
> MetadataRequest only requires the read lock to e processed.
> For large cluster which has tens of thousands of partitions, it can take e.g. 
> 200 ms to process UpdateMetadataRequest and MetadataRequest from large 
> clients (e.g. MM). During the period when user is rebalancinng cluster, the 
> leadership change will cause both UpdateMetadataRequest from controller and 
> also MetadataRequest from client. If a broker receives 10 MetadataRequest per 
> second and 2 UpdateMetadataRequest per second on average, since these 
> requests need to be processed one-at-a-time, it can reduce the request 
> handler thread idle ratio to 0 which makes this broker unavailable to user.
> We can address this problem by removing the read lock in MetadataCache. The 
> idea is that MetadataCache.updateCache() can instantiate a new copy of the 
> cache as method local variable when it is processing the 
> UpdateMetadataRequest and replace the class private varaible with newly 
> instantiated method local varaible at the end of MetadataCache.updateCache(). 
> The handling of MetadataRequest only requires access to the read-only 
> class-private variable. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to