[
https://issues.apache.org/jira/browse/KAFKA-7019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511899#comment-16511899
]
ASF GitHub Bot commented on KAFKA-7019:
---------------------------------------
radai-rosenblatt opened a new pull request #5221: KAFKA-7019 - make reading
metadata lock-free by maintaining an atomically-updated read snapshot
URL: https://github.com/apache/kafka/pull/5221
we've seen cases where under a constant metadata update load (automated
partition rebalance) request handling threads can block for a significant
amount of time on the metadata lock.
the reason is that for large enough clusters (~200,000 topic partitions) a
read operation can actually take a long while to compose a response. under a
constant stream of reads + writes we see situations where a reader is currently
in, a writer is pending (blocked) and then a big pile-up of more readers that
are blocked behind the pending writer.
this patch makes the read path lock-free. the metadata is now stored in a
logically-immutable snapshot. all read operations grab a snapshot and serve
data out of it. write paths create an entirely new snapshot and atomically
assign it. writers are still under a lock, for mutual exclusion.
here's the benchmark code i used to measure the effects of this patch:
```java
public class MetadataCacheBenchmark {
private volatile boolean running = true;
int numBrokers = 150;
int numTopics = 3500;
int maxPartitionsPerTopic = 100;
int replicationFactor = 2;
int numUpdaters = 1;
double updateRateLimit = 10.0; //qps
int numReaders = 5;
boolean partialUpdate = true;
private final ListenerName listener = new ListenerName("listener");
private final AtomicLong updateCounter = new AtomicLong();
private final AtomicLong readCounter = new AtomicLong();
@Test
public void benchmarkAllTheThings() throws Exception {
//long seed = System.currentTimeMillis();
long seed = 666;
System.err.println("seed is " + seed);
Random r = new Random(seed);
MetadataCache cache = new MetadataCache(666);
UpdateMetadataRequest fullRequest = buildRequest(r, -1);
UpdateMetadataRequest partialRequest = buildRequest(r, 1);
cache.updateCache(0, fullRequest); //initial data (useful in case
there are no writers)
Set<String> allTopics = new HashSet<>();
for (int i = 0; i < numTopics; i++) {
allTopics.add("topic-" + i);
}
scala.collection.mutable.Set<String> topicsScalaSet =
JavaConverters.asScalaSetConverter(allTopics).asScala();
Thread.UncaughtExceptionHandler exceptionHandler = new
Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
running = false;
System.err.println("thread " + t + " died");
e.printStackTrace(System.err);
System.exit(1);
}
};
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numUpdaters; i++) {
UpdateMetadataRequest req = partialUpdate ? partialRequest :
fullRequest;
Runnable updaterRunnable;
if (updateRateLimit > 0) {
updaterRunnable = new
RateLimitedUpdateRunnable(updateRateLimit, cache, req);
} else {
updaterRunnable = new UpdateRunnable(cache, req);
}
Thread updaterThread = new Thread(updaterRunnable, "updater-" +
i);
updaterThread.setDaemon(true);
updaterThread.setUncaughtExceptionHandler(exceptionHandler);
threads.add(updaterThread);
}
for (int i = 0; i < numReaders; i++) {
ReadRunnable readRunnable = new ReadRunnable(cache,
topicsScalaSet);
Thread readerThread = new Thread(readRunnable, "reader-" + i);
readerThread.setDaemon(true);
readerThread.setUncaughtExceptionHandler(exceptionHandler);
threads.add(readerThread);
}
for (Thread t : threads) {
t.start();
}
long prevTime = System.currentTimeMillis();
long prevUpdates = 0;
long prevReads = 0;
long now;
long updates;
long reads;
long timeDiff;
long updateDiff;
long readDiff;
double updateQps;
double readQps;
while (running) {
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
now = System.currentTimeMillis();
updates = updateCounter.longValue();
reads = readCounter.longValue();
timeDiff = now - prevTime;
updateDiff = updates - prevUpdates;
readDiff = reads - prevReads;
updateQps = ((double) updateDiff * 1000) / timeDiff;
readQps = ((double) readDiff * 1000) / timeDiff;
prevTime = now;
prevUpdates = updates;
prevReads = reads;
System.err.println("updates: " + updateQps + " / sec");
System.err.println("reads: " + readQps + " / sec");
}
}
private UpdateMetadataRequest buildRequest(Random random, int
numTopicsOverride) {
int controllerEpoch = 0;
int totalPartitions = 0;
Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>();
for (int i = 0; i < numBrokers; i++) {
UpdateMetadataRequest.EndPoint endPoint =
new UpdateMetadataRequest.EndPoint("host-" + i, 6666,
SecurityProtocol.PLAINTEXT, listener);
UpdateMetadataRequest.Broker broker =
new UpdateMetadataRequest.Broker(i,
Collections.singletonList(endPoint), "rack-" + i);
liveBrokers.add(broker);
}
Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitions
= new HashMap<>();
int topicCount = numTopicsOverride > 0 ? numTopicsOverride :
numTopics;
for (int i = 0; i < topicCount; i++) {
String topicName = "topic-" + i;
int numPartitions = 1 + random.nextInt(maxPartitionsPerTopic);
for (int j = 0; j < numPartitions; j++) {
TopicPartition tp = new TopicPartition(topicName, j);
List<Integer> replicas = pick(replicationFactor, numBrokers,
random);
UpdateMetadataRequest.PartitionState state =
new
UpdateMetadataRequest.PartitionState(controllerEpoch, replicas.get(0), 0,
replicas, 0, replicas,
Collections.<Integer>emptyList());
partitions.put(tp, state);
}
totalPartitions += numPartitions;
}
UpdateMetadataRequest.Builder builder =
new UpdateMetadataRequest.Builder((short) 4, 0, controllerEpoch,
partitions, liveBrokers);
UpdateMetadataRequest request = builder.build((short) 4);
System.err.println("request has " + totalPartitions + " TPs total");
return request;
}
private List<Integer> pick(int howMany, int from, Random random) {
List<Integer> result = new ArrayList<>(howMany);
while (result.size() < howMany) {
int chosen = random.nextInt(from); //exclusive
if (!result.contains(chosen)) {
result.add(chosen);
}
}
return result;
}
private class UpdateRunnable implements Runnable {
private final MetadataCache cache;
private final UpdateMetadataRequest request;
private int counter = 0;
public UpdateRunnable(MetadataCache cache, UpdateMetadataRequest
request) {
this.cache = cache;
this.request = request;
}
@Override
public void run() {
while (running) {
cache.updateCache(counter++, request);
updateCounter.incrementAndGet();
}
}
}
private class RateLimitedUpdateRunnable implements Runnable {
private final MetadataCache cache;
private final UpdateMetadataRequest request;
private int counter = 0;
private final double targetQps;
private final int intervalMillis;
public RateLimitedUpdateRunnable(double targetQps, MetadataCache
cache, UpdateMetadataRequest request) {
this.cache = cache;
this.request = request;
this.targetQps = targetQps;
this.intervalMillis = (int) (1000.0 / targetQps);
}
@Override
public void run() {
while (running) {
long start = System.currentTimeMillis();
cache.updateCache(counter++, request);
long end = System.currentTimeMillis();
updateCounter.incrementAndGet();
long took = end - start;
long remaining = intervalMillis - took;
if (remaining > 0) {
try {
Thread.sleep(remaining);
} catch (Exception e) {
e.printStackTrace(System.err);
}
}
}
}
}
private class ReadRunnable implements Runnable {
private final MetadataCache cache;
private final scala.collection.mutable.Set<String> topics;
public ReadRunnable(MetadataCache cache,
scala.collection.mutable.Set<String> topics) {
this.cache = cache;
this.topics = topics;
}
@Override
public void run() {
while (running) {
cache.getTopicMetadata(topics, listener, false);
readCounter.incrementAndGet();
}
}
}
}
```
the interesting/problematic scenario is a combination of writers and readers,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Reduction the contention between metadata update and metadata read operation
> ----------------------------------------------------------------------------
>
> Key: KAFKA-7019
> URL: https://issues.apache.org/jira/browse/KAFKA-7019
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Dong Lin
> Assignee: Radai Rosenblatt
> Priority: Major
> Fix For: 2.1.0
>
>
> Currently MetadataCache.updateCache() grabs a write lock in order to process
> the UpdateMetadataRequest from controller. And a read lock is needed in order
> to handle the MetadataRequest from clients. Thus the handling of
> MetadataRequest and UpdateMetadataRequest blocks each other and the broker
> can only process such request at a time even if there are multiple request
> handler threads. Note that broker can not process MetadataRequest in parallel
> if there is a UpdateMetadataRequest waiting for the write lock, even if
> MetadataRequest only requires the read lock to e processed.
> For large cluster which has tens of thousands of partitions, it can take e.g.
> 200 ms to process UpdateMetadataRequest and MetadataRequest from large
> clients (e.g. MM). During the period when user is rebalancinng cluster, the
> leadership change will cause both UpdateMetadataRequest from controller and
> also MetadataRequest from client. If a broker receives 10 MetadataRequest per
> second and 2 UpdateMetadataRequest per second on average, since these
> requests need to be processed one-at-a-time, it can reduce the request
> handler thread idle ratio to 0 which makes this broker unavailable to user.
> We can address this problem by removing the read lock in MetadataCache. The
> idea is that MetadataCache.updateCache() can instantiate a new copy of the
> cache as method local variable when it is processing the
> UpdateMetadataRequest and replace the class private varaible with newly
> instantiated method local varaible at the end of MetadataCache.updateCache().
> The handling of MetadataRequest only requires access to the read-only
> class-private variable.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)