arjunashok commented on code in PR #58: URL: https://github.com/apache/cassandra-sidecar/pull/58#discussion_r1300514341
########## adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicas.java: ########## @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.jetbrains.annotations.NotNull; + + +/** + * Representation of a token range (exclusive start and inclusive end - (start, end]) and the + * corresponding mapping to replica-set hosts. Static factory ensures that ranges are always unwrapped. + * Note: Range comparisons are used for ordering of ranges. eg. A.compareTo(B) <= 0 implies that + * range A occurs before range B, not their sizes. + */ +public class TokenRangeReplicas implements Comparable<TokenRangeReplicas> +{ + private final BigInteger start; + private final BigInteger end; + + private final Partitioner partitioner; + + private final Set<String> replicaSet; + + private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeReplicas.class); + + private TokenRangeReplicas(BigInteger start, BigInteger end, Partitioner partitioner, Set<String> replicaSet) + { + this.start = start; + this.end = end; + this.partitioner = partitioner; + this.replicaSet = replicaSet; + } + + public static List<TokenRangeReplicas> generateTokenRangeReplicas(BigInteger start, + BigInteger end, + Partitioner partitioner, + Set<String> replicaSet) + { + if (start.compareTo(end) > 0) + { + return unwrapRange(start, end, partitioner, replicaSet); + } + + return Collections.singletonList(new TokenRangeReplicas(start, end, partitioner, replicaSet)); + } + + + public BigInteger start() + { + return start; + } + + public BigInteger end() + { + return end; + } + + public Set<String> replicaSet() + { + return replicaSet; + } + + /** + * {@inheritDoc} + */ + @Override + public int compareTo(@NotNull TokenRangeReplicas other) + { + validateRangesForComparison(other); + int compareStart = this.start.compareTo(other.start); + return (compareStart != 0) ? compareStart : this.end.compareTo(other.end); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + TokenRangeReplicas that = (TokenRangeReplicas) o; + + return Objects.equals(start, that.start) + && Objects.equals(end, that.end) + && partitioner == that.partitioner; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() + { + return Objects.hash(start, end, partitioner); + } + + private void validateRangesForComparison(@NotNull TokenRangeReplicas other) + { + if (this.partitioner != other.partitioner) + throw new IllegalStateException("Token ranges being compared do not have the same partitioner"); + } + + protected boolean contains(TokenRangeReplicas other) + { + validateRangesForComparison(other); + return (other.start.compareTo(this.start) >= 0 && other.end.compareTo(this.end) <= 0); + } + + /** + * For subset ranges, this is used to determine if a range is larger than the other by comparing start-end lengths + * If both ranges end at the min, we compare starting points to determine the result. + * When the left range is the only one ending at min, it is always the larger one since all subsequent ranges + * in the sorted range list have to be smaller. + * <p> + * This method assumes that the ranges are normalized and unwrapped, i.e. + * 'this' comes before 'other' AND there's no wrapping around the min token + * + * @param other the next range in the range list to compare + * @return true if "this" range is larger than the other + */ + protected boolean isLarger(TokenRangeReplicas other) + { + validateRangesForComparison(other); + return this.end.subtract(this.start).compareTo(other.end.subtract(other.start)) > 0; + } + + /** + * Determines intersection if the next range starts before the current range ends. This method assumes that + * the provided ranges are sorted and unwrapped. + * When the current range goes all the way to the end, we determine intersection if the next range starts + * after the current since all subsequent ranges have to be subsets. + * + * @param other the range we are currently processing to check if "this" intersects it + * @return true if "this" range intersects the other + */ + protected boolean intersects(TokenRangeReplicas other) + { + if (this.compareTo(other) > 0) + throw new IllegalStateException( + String.format("Token ranges - (this:%s other:%s) are not ordered", this, other)); + + return this.end.compareTo(other.start) > 0 && this.start.compareTo(other.end) < 0; // Start exclusive (DONE) + } + + /** + * Unwraps the token range if it wraps-around to end either on or after the least token by overriding such + * ranges to end at the partitioner max-token value in the former case and splitting into 2 ranges in the latter + * case. + * + * @return list of split ranges + */ + private static List<TokenRangeReplicas> unwrapRange(BigInteger start, + BigInteger end, + Partitioner partitioner, + Set<String> replicaSet) + { + + // Range ending at minToken is "unwrapped" to end at the maxToken. + // Note: These being open-closed ranges, this will result in exclusion of partitioner's minToken from + // allocation. This is by-design as it is never assigned to a node in Cassandra: + // https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java#L77 + if (end.compareTo(partitioner.minToken) == 0) + { + return Collections.singletonList( + new TokenRangeReplicas(start, partitioner.maxToken, partitioner, replicaSet)); + } + else if (start.compareTo(partitioner.maxToken) == 0) + { + return Collections.singletonList( + new TokenRangeReplicas(partitioner.minToken, end, partitioner, replicaSet)); + } + + // Wrap-around range goes beyond at the "min-token" and is therefore split into two. + List<TokenRangeReplicas> unwrapped = new ArrayList<>(2); + unwrapped.add(new TokenRangeReplicas(start, partitioner.maxToken, partitioner, replicaSet)); + unwrapped.add(new TokenRangeReplicas(partitioner.minToken, end, partitioner, replicaSet)); + return unwrapped; + } + + + /** + * Given a list of token ranges with replica-sets, normalizes them by unwrapping around the beginning/min + * of the range and removing overlaps to return a sorted list of non-overlapping ranges. + * <p> + * For an overlapping range that is included in both natural and pending ranges, say R_natural and R_pending + * (where R_natural == R_pending), the replicas of both R_natural and R_pending should receive writes. + * Therefore, the write-replicas of such range is the union of both replica sets. + * This method implements the consolidation process. + * + * @param ranges + * @return sorted list of non-overlapping ranges and replica-sets + */ + public static List<TokenRangeReplicas> normalize(List<TokenRangeReplicas> ranges) + { + + if (ranges.stream().noneMatch(r -> r.partitioner.minToken.compareTo(r.start()) == 0)) + { + LOGGER.warn("{} based minToken does not exist in the token ranges", Partitioner.class.getName()); + } + + return deoverlap(ranges); + } + + /** + * Given a list of unwrapped (around the starting/min value) token ranges and their replica-sets, return list of + * ranges with no overlaps. Any impacted range absorbs the replica-sets from the overlapping range. + * This is to ensure that we have most coverage while using the replica-sets as write-replicas. + * Overlaps are removed by splitting the original range around the overlap boundaries, resulting in sub-ranges + * with replicas from all the overlapping replicas. + * + * + * <pre> + * Illustration: + * Input with C overlapping with A and B + * |----------A-----------||----------B-------------| + * |--------C----------| + * + * Split result: C is split first which further splits A and B to create + * |-----------A----------||----------B-------------| + * |---C---|----C'----| + * + * Subsets C & C' are merged into supersets A and B by splitting them. Replica-sets for A,C and B,C are merged + * for the resulting ranges. + * |-----A------|----AC---||---BC-----|-----B------| + * + * </pre> + */ + private static List<TokenRangeReplicas> deoverlap(List<TokenRangeReplicas> allRanges) + { + if (allRanges.isEmpty()) + return allRanges; + + LOGGER.debug("Token ranges to be normalized: {}", allRanges); + List<TokenRangeReplicas> ranges = mergeIdenticalRanges(allRanges); + + List<TokenRangeReplicas> output = new ArrayList<>(); + Iterator<TokenRangeReplicas> iter = ranges.iterator(); + TokenRangeReplicas current = iter.next(); + + while (iter.hasNext()) + { + TokenRangeReplicas next = iter.next(); + if (!current.intersects(next)) + { + output.add(current); + current = next; + } + else + { + current = processIntersectingRanges(output, iter, current, next); + } + } + if (current != null) + output.add(current); + return output; + } + + private static List<TokenRangeReplicas> mergeIdenticalRanges(List<TokenRangeReplicas> ranges) + { + Map<TokenRangeReplicas, Set<String>> rangeMapping = new HashMap<>(); + for (TokenRangeReplicas r: ranges) + { + if (!rangeMapping.containsKey(r)) + { + rangeMapping.put(r, r.replicaSet); + } + else + { + rangeMapping.get(r).addAll(r.replicaSet); + } + } + + List<TokenRangeReplicas> merged = new ArrayList<>(); + for (Map.Entry<TokenRangeReplicas, Set<String>> entry : rangeMapping.entrySet()) + { + TokenRangeReplicas r = entry.getKey(); + if (!r.replicaSet().equals(entry.getValue())) + { + r.replicaSet().addAll(entry.getValue()); + } + merged.add(r); + } + Collections.sort(merged); + return merged; + } + + /** + * Splits intersecting token ranges starting from the provided cursors and the iterator, while accumulating + * overlapping replicas into each sub-range. + * <p> + * The algorithm 1) extracts all intersecting ranges at the provided cursor, and 2) Maintains a min-heap of all + * intersecting ranges ordered by the end of the range, so that the least common sub-range relative to the current + * range can be extracted. + * + * @param output ongoing list of resulting non-overlapping ranges + * @param iter iterator over the list of ranges + * @param current cursor to the current, intersecting range + * @param next cursor to the intersecting range after the current range + * @return cursor to the subsequent non-intersecting range + */ + protected static TokenRangeReplicas processIntersectingRanges(List<TokenRangeReplicas> output, + Iterator<TokenRangeReplicas> iter, + TokenRangeReplicas current, + TokenRangeReplicas next) + { + PriorityQueue<TokenRangeReplicas> rangeHeap = + new PriorityQueue<>((n1, n2) -> (!n1.end.equals(n2.end())) ? + n1.end().compareTo(n2.end()) : n1.compareTo(n2)); + + List<TokenRangeReplicas> intersectingRanges = new ArrayList<>(); + next = extractIntersectingRanges(intersectingRanges::add, iter, current, next); + rangeHeap.add(intersectingRanges.get(0)); + intersectingRanges.stream().skip(1).forEach(r -> { + if (!rangeHeap.isEmpty()) + { + TokenRangeReplicas range = rangeHeap.peek(); + // Use the last processed range's end as the new range's start + // Except when its the first range, in which case, we use the queue-head's start + BigInteger newStart = output.isEmpty() ? range.start() : output.get(output.size() - 1).end(); + + if (r.start().compareTo(rangeHeap.peek().end()) == 0) + { + output.add(new TokenRangeReplicas(newStart, + r.start(), + range.partitioner, + getBatchReplicas(rangeHeap))); + rangeHeap.poll(); + } + else if (r.start().compareTo(rangeHeap.peek().end()) > 0) + { + output.add(new TokenRangeReplicas(newStart, + range.end(), + range.partitioner, + getBatchReplicas(rangeHeap))); + rangeHeap.poll(); + } + // Start-token is before the first intersecting range end. We have not encountered end of the range, so + // it is not removed from the heap yet. + else + { + if (newStart.compareTo(r.start()) != 0) + { + output.add(new TokenRangeReplicas(newStart, + r.start(), + range.partitioner, + getBatchReplicas(rangeHeap))); + } + } + rangeHeap.add(r); + } + }); + + // Remaining intersecting ranges from heap are processed + while (!rangeHeap.isEmpty()) + { + LOGGER.info("Non-empty queue:" + rangeHeap.size()); + TokenRangeReplicas nextVal = rangeHeap.peek(); + BigInteger newStart = output.isEmpty() ? nextVal.start() : output.get(output.size() - 1).end(); + // Corner case w/ common end ranges - we do not add redundant single token range + if (newStart.compareTo(nextVal.end()) != 0) + { + output.add(new TokenRangeReplicas(newStart, + nextVal.end(), + nextVal.partitioner, + getBatchReplicas(rangeHeap))); + } + rangeHeap.poll(); + } + return next; + } + + /** + * Extract all the intersecting ranges starting from the current cursor, which we know is intersecting with the + * next range. Note that the cursor is moved forward until a non-intersecting range is found. + * + * @param rangeConsumer functional interface to collect candidate intersecting ranges + * @param iter ongoing iterator over the entire range-set + * @param current cursor to the current, intersecting range + * @param next cursor to the next intersecting range + * @return list of intersecting ranges starting at the specified cursor + */ + private static TokenRangeReplicas extractIntersectingRanges(Consumer<TokenRangeReplicas> rangeConsumer, + Iterator<TokenRangeReplicas> iter, + TokenRangeReplicas current, + TokenRangeReplicas next) + { + // we know that current and next intersect + rangeConsumer.accept(current); + rangeConsumer.accept(next); + current = (current.contains(next)) ? current : next; + next = null; + while (iter.hasNext()) + { + next = iter.next(); + if (!current.intersects(next)) + { + break; + } + rangeConsumer.accept(next); + // when next is subset of current, we keep tracking current + current = (current.contains(next)) ? current : next; + next = null; + } + return next; + } + + // TODO: Verify why we need all replicas from queue Review Comment: Resolved. Will remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

