cmccabe commented on a change in pull request #10494:
URL: https://github.com/apache/kafka/pull/10494#discussion_r633926611



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.metadata.OptionalStringComparator;
+import org.apache.kafka.metadata.UsableBroker;
+
+
+/**
+ * The striped replica placer.
+ *
+ *
+ * GOALS
+ * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
+ * to spread the replicas as evenly as we can across racks.  In the simple 
case where
+ * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * highest priority goal in multi-rack clusters.
+ *
+ * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * multiple partitions, we try to avoid putting each partition on the same set 
of
+ * replicas, even if it does satisfy the rack placement goal.  However, we 
treat the rack
+ * placement goal as higher priority than this goal-- if you configure 10 
brokers in rack
+ * A and B, and 1 broker in rack C, you will end up with a lot of partitions 
on that one
+ * broker in rack C.  If you were to place a lot of partitions with 
replication factor 3,
+ * each partition would try to get a replica there.  In general racks are 
supposed to be
+ * about the same size -- if they aren't, this is a user error.
+ *
+ * Thirdly, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * brokers.
+ *
+ *
+ * CONSTRAINTS
+ * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
+ * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
+ * satisfied.  The first constraint is that we can't place more than one 
replica on the
+ * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * Kafka's internal design.
+ *
+ * The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
+ * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * users.
+ *
+ *
+ * ALGORITHM
+ * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
+ * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * the top-level RackList object.
+ *
+ * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * we had racks A, B, C, and D, and we were creating a new partition with 
replication
+ * factor 3, our first replica might come from A, our second from B, and our 
third from C.
+ * Of course our placement would not be very fair if we always started with 
rack A.
+ * Therefore, we generate a random starting offset when the RackList is 
created.  So one
+ * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ *
+ * Note that each partition we generate advances the starting offset by one.
+ * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
+ *
+ * partition 1: A, B, C
+ * partition 2: B, C, A
+ * partition 3: C, A, B
+ *
+ * This is what generates the characteristic "striped" pattern of this placer.
+ *
+ * So far I haven't said anything about how we choose a replica from within a 
rack.  In
+ * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
+ * Just like with the racks, we add a random starting offset to mix things up 
a bit.
+ *
+ * So let's say you had a cluster with racks A, B, and C, and each rack had 3 
replicas,
+ * for 9 nodes in total.
+ * If all the offsets were 0, you'd get placements like this:
+ *
+ * partition 1: A0, B0, C0
+ * partition 2: B1, C1, A1
+ * partition 3: C2, A2, B2
+ *
+ * One additional complication with choosing a replica within a rack is that 
we want to
+ * choose the unfenced replicas first.  In a big cluster with lots of nodes 
available,
+ * we'd prefer not to place a new partition on a node that is fenced.  
Therefore, we
+ * actually maintain two lists, rather than the single list I described above.
+ * We only start using the fenced node list when the unfenced node list is 
totally
+ * exhausted.
+ *
+ * Furthermore, we cannot place the first replica (the leader) of a new 
partition on a
+ * fenced replica.  Therefore, we have some special logic to ensure that this 
doesn't
+ * happen.
+ */
+public class StripedReplicaPlacer implements ReplicaPlacer {
+    /**
+     * A list of brokers that we can iterate through.
+     */
+    static class BrokerList {
+        final static BrokerList EMPTY = new BrokerList();
+        private final List<Integer> brokers = new ArrayList<>(0);
+        private int epoch = 0;
+        private int index = 0;
+        private int offset = 0;
+
+        BrokerList add(int broker) {
+            this.brokers.add(broker);
+            return this;
+        }
+
+        /**
+         * Initialize this broker list by sorting it and randomizing the start 
offset.
+         *
+         * @param random    The random number generator.
+         */
+        void initialize(Random random) {
+            if (!brokers.isEmpty()) {
+                brokers.sort(Integer::compareTo);
+                this.offset = random.nextInt(brokers.size());
+            }
+        }
+
+        /**
+         * Randomly shuffle the brokers in this list.
+         */
+        void shuffle(Random random) {
+            Collections.shuffle(brokers, random);

Review comment:
       Hmm, I don't think resetting the position is necessary... when 
`RackList#place` shuffles, it also sets the epoch to 0, so the offset will be 
reset anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to