iamaleksey commented on code in PR #4119: URL: https://github.com/apache/cassandra/pull/4119#discussion_r2081354062
########## src/java/org/apache/cassandra/replication/Offsets.java: ########## Review Comment: To be completely honest, I find the addition of `Immutable` and `Builder` here a little superfluous. I see most added value from immutable variants of collections (or wrapping immutable views) in being able to avoid defensive copying, but we cannot really avoid copying in `CoordinatorLog`. Elsewhere, returning/making mutable copies is absolutely fine in my opinion - and often actually desired. And methods like `intersection()` or `difference()` *should* be returning mutable versions anyway. Guava's `Sets` don't because they return a view, but we here materialise the actual sets, so why take away the ability to mutate from the caller? ########## src/java/org/apache/cassandra/replication/MultiOffsets.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Consumer; + +import com.google.common.base.Preconditions; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public abstract class MultiOffsets<T extends Offsets> Review Comment: Maybe `MultiLogOffsets`? Or `Log2OffsetsMap`? ########## src/java/org/apache/cassandra/replication/MultiOffsets.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Consumer; + +import com.google.common.base.Preconditions; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public abstract class MultiOffsets<T extends Offsets> +{ + abstract Long2ObjectHashMap<T> offsetMap(); + + public int idCount() + { + int count = 0; + for (T offsets : offsetMap().values()) + count += offsets.offsetCount(); + return count; + } + + public void forEachId(Consumer<ShortMutationId> consumer) + { + offsetMap().values().forEach(offsets -> offsets.forEachOffset(((logId, offset) -> { + consumer.accept(new ShortMutationId(logId, offset)); + }))); + } + + public boolean isEmpty() Review Comment: Probably just iterate over `Offsets` until one returns `true` for `isEmpty()`? Can be excessive to always fully count here. ########## src/java/org/apache/cassandra/db/ReadCommand.java: ########## Review Comment: I think we no longer need `augmentResultWithMutations()` and `queryJournal()` here, and their implementations as well. ########## src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java: ########## @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.tracked; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.AbstractPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.*; +import org.apache.cassandra.net.*; +import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.CollectionSerializer; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; +import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; + +public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>> implements RequestCallback<TrackedDataResponse> +{ + private static final Logger logger = LoggerFactory.getLogger(TrackedRead.class); + + public static class Id + { + private static final int nodeId = ClusterMetadata.current().myNodeId().id(); + private static final AtomicLong lastHlc = new AtomicLong(); + + private final int node; + private final long hlc; + + public Id(int node, long hlc) + { + this.node = node; + this.hlc = hlc; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Id id = (Id) o; + return node == id.node && hlc == id.hlc; + } + + @Override + public int hashCode() + { + return Integer.hashCode(node) * 31 + Long.hashCode(hlc); + } + + @Override + public String toString() + { + return "Id{" + node + ':' + hlc + '}'; + } + + public static final IVersionedSerializer<Id> serializer = new IVersionedSerializer<Id>() + { + @Override + public void serialize(Id id, DataOutputPlus out, int version) throws IOException + { + out.writeInt(id.node); + out.writeLong(id.hlc); + } + + @Override + public Id deserialize(DataInputPlus in, int version) throws IOException + { + return new Id(in.readInt(), in.readLong()); + } + + @Override + public long serializedSize(Id id, int version) + { + return TypeSizes.INT_SIZE + TypeSizes.LONG_SIZE; Review Comment: Just a nit, but I find that using the static `TypeSizes.sizeOf()` on primitive fields in `serializedSize()` impls makes it a little smoother to maintain / harder to miss a code change if an int changes to a long for example. ########## src/java/org/apache/cassandra/replication/MutationJournal.java: ########## @@ -87,6 +87,15 @@ public boolean read(ShortMutationId id, RecordConsumer<ShortMutationId> consumer return journal.readLast(id, consumer); } + public void readAll(MultiOffsets<?> ids, Collection<Mutation> into) Review Comment: Would be a little nicer for `Offsets` and `MultiOffsets` to implement `Iterable<ShortMutationId>` and just use the existing `void readAll(Iterable<ShortMutationId> ids, Collection<Mutation> into)` I think. ########## src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.tracked; + +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class TrackedDataResponse +{ + private final int serializationVersion; + private final ByteBuffer data; + + public TrackedDataResponse(int serializationVersion, ByteBuffer data) + { + this.serializationVersion = serializationVersion; + this.data = data; + } + + public static TrackedDataResponse create(PartitionIterator iter, ColumnFilter selection) + { + try (DataOutputBuffer buffer = new DataOutputBuffer()) + { + PartitionIterators.Serializer.serialize(iter, selection, buffer, MessagingService.current_version); + return new TrackedDataResponse(MessagingService.current_version, buffer.buffer()); Review Comment: We don't need to duplicate the buffer here I reckon? ########## src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java: ########## @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.tracked; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.AbstractPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.*; +import org.apache.cassandra.net.*; +import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.CollectionSerializer; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; +import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; + +public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>> implements RequestCallback<TrackedDataResponse> +{ + private static final Logger logger = LoggerFactory.getLogger(TrackedRead.class); + + public static class Id + { + private static final int nodeId = ClusterMetadata.current().myNodeId().id(); + private static final AtomicLong lastHlc = new AtomicLong(); + + private final int node; + private final long hlc; + + public Id(int node, long hlc) + { + this.node = node; + this.hlc = hlc; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Id id = (Id) o; + return node == id.node && hlc == id.hlc; + } + + @Override + public int hashCode() + { + return Integer.hashCode(node) * 31 + Long.hashCode(hlc); + } + + @Override + public String toString() + { + return "Id{" + node + ':' + hlc + '}'; + } + + public static final IVersionedSerializer<Id> serializer = new IVersionedSerializer<Id>() + { + @Override + public void serialize(Id id, DataOutputPlus out, int version) throws IOException + { + out.writeInt(id.node); + out.writeLong(id.hlc); + } + + @Override + public Id deserialize(DataInputPlus in, int version) throws IOException + { + return new Id(in.readInt(), in.readLong()); Review Comment: A few other `deserialize()` implementations with this pattern around, haven't flagged them individually. ########## src/java/org/apache/cassandra/db/partitions/SimpleBTreePartition.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.UpdateFunction; +import org.apache.cassandra.utils.memory.HeapCloner; + +/** + * Single threaded btree partition, no waste or allocation tracking + */ +public class SimpleBTreePartition extends AbstractBTreePartition implements UpdateFunction<Row, Row> +{ + private final TableMetadata metadata; + private final UpdateTransaction indexer; + private BTreePartitionData data = BTreePartitionData.EMPTY; + + public SimpleBTreePartition(DecoratedKey partitionKey, TableMetadata metadata, UpdateTransaction indexer) + { + super(partitionKey); + this.metadata = metadata; + this.indexer = indexer; + } + + @Override + protected BTreePartitionData holder() + { + return data; + } + + @Override + public TableMetadata metadata() + { + return metadata; + } + + @Override + protected boolean canHaveShadowedData() + { + return true; + } + + @Override + public void onAllocatedOnHeap(long heapSize) + { + + } + + protected BTreePartitionData makeMergedPartition(BTreePartitionData current, PartitionUpdate update) + { + DeletionInfo newDeletionInfo = merge(current.deletionInfo, update.deletionInfo()); + + RegularAndStaticColumns columns = current.columns; + RegularAndStaticColumns newColumns = update.columns().mergeTo(columns); + Row newStatic = mergeStatic(current.staticRow, update.staticRow()); + + Object[] tree = BTree.update(current.tree, update.holder().tree, update.metadata().comparator, this); + EncodingStats newStats = current.stats.mergeWith(update.stats()); + + return new BTreePartitionData(newColumns, tree, newDeletionInfo, newStatic, newStats); + } + + private Row mergeStatic(Row current, Row update) + { + if (update.isEmpty()) + return current; + if (current.isEmpty()) + return insert(update); + + return merge(current, update); + } + + private DeletionInfo merge(DeletionInfo existing, DeletionInfo update) + { + if (update.isLive() || !update.mayModify(existing)) + return existing; + + if (!update.getPartitionDeletion().isLive()) + indexer.onPartitionDeletion(update.getPartitionDeletion()); + + if (update.hasRanges()) + update.rangeIterator(false).forEachRemaining(indexer::onRangeTombstone); + + // Like for rows, we have to clone the update in case internal buffers (when it has range tombstones) reference + // memory we shouldn't hold into. But we don't ever store this off-heap currently so we just default to the + // HeapAllocator (rather than using 'allocator'). + DeletionInfo newInfo = existing.mutableCopy().add(update.clone(HeapCloner.instance)); + return newInfo; + } + + public Row insert(Row insert) + { + indexer.onInserted(insert); + return insert; + } + + public Row merge(Row existing, Row update) + { + Row reconciled = Rows.merge(existing, update, ColumnData.noOp); + indexer.onUpdated(existing, reconciled); + + return reconciled; + } + + public Cell<?> merge(Cell<?> previous, Cell<?> insert) Review Comment: Dead code? ########## src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java: ########## @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.tracked; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.AbstractPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.*; +import org.apache.cassandra.net.*; +import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.CollectionSerializer; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; +import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; + +public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>> implements RequestCallback<TrackedDataResponse> +{ + private static final Logger logger = LoggerFactory.getLogger(TrackedRead.class); + + public static class Id + { + private static final int nodeId = ClusterMetadata.current().myNodeId().id(); + private static final AtomicLong lastHlc = new AtomicLong(); + + private final int node; + private final long hlc; + + public Id(int node, long hlc) + { + this.node = node; + this.hlc = hlc; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Id id = (Id) o; + return node == id.node && hlc == id.hlc; + } + + @Override + public int hashCode() + { + return Integer.hashCode(node) * 31 + Long.hashCode(hlc); + } + + @Override + public String toString() + { + return "Id{" + node + ':' + hlc + '}'; + } + + public static final IVersionedSerializer<Id> serializer = new IVersionedSerializer<Id>() + { + @Override + public void serialize(Id id, DataOutputPlus out, int version) throws IOException + { + out.writeInt(id.node); + out.writeLong(id.hlc); + } + + @Override + public Id deserialize(DataInputPlus in, int version) throws IOException + { + return new Id(in.readInt(), in.readLong()); Review Comment: I know it's quite precedented in our codebase, but when it comes to serializers in particular I feel it's safer to use local variables rather than depend on constructor/method argument evaluation order. Otherwise an automated method refactor breaks deser logic without being detected (Refactor->Change Signature on `Id(int node, long hlc)` to swap argument order, for example, will break this method. ########## src/java/org/apache/cassandra/replication/MultiOffsets.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Consumer; + +import com.google.common.base.Preconditions; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public abstract class MultiOffsets<T extends Offsets> +{ + abstract Long2ObjectHashMap<T> offsetMap(); + + public int idCount() + { + int count = 0; + for (T offsets : offsetMap().values()) + count += offsets.offsetCount(); + return count; + } + + public void forEachId(Consumer<ShortMutationId> consumer) + { + offsetMap().values().forEach(offsets -> offsets.forEachOffset(((logId, offset) -> { + consumer.accept(new ShortMutationId(logId, offset)); + }))); + } + + public boolean isEmpty() + { + return idCount() == 0; + } + + private static abstract class AbstractMutable<T extends Offsets.AbstractMutable<T>> extends MultiOffsets<T> + { + protected final Long2ObjectHashMap<T> offsetMap = new Long2ObjectHashMap<>(); + + @Override + Long2ObjectHashMap<T> offsetMap() + { + return offsetMap; + } + + protected abstract T createOrNull(Offsets.RangeIterator iterator); + protected abstract T create(CoordinatorLogId logId); + protected abstract T copy(Offsets offsets); + + protected T create(long logId) + { + return create(new CoordinatorLogId(logId)); + } + + public void add(ShortMutationId id) + { + T offsets = offsetMap.computeIfAbsent(id.logId(), this::create); + offsets.add(id.offset()); + } + + public void add(Offsets offsets) + { + if (offsets.isEmpty()) + return; + + T existing = offsetMap.get(offsets.logId().asLong()); + if (existing == null) + { + offsetMap.put(offsets.logId().asLong(), copy(offsets)); + return; + } + + existing.addAll(offsets); + } + + public void addAll(MultiOffsets<?> that) + { + for (Offsets offsets : that.offsetMap().values()) + add(offsets); + } + + public void remove(Offsets offsets) + { + T existing = offsetMap.get(offsets.logId().asLong()); + if (existing == null) + return; + + if (existing.isEmpty()) + { + offsetMap.remove(offsets.logId().asLong()); + return; + } + + T next = createOrNull(Offsets.difference(existing.rangeIterator(), offsets.rangeIterator())); + if (next == null) + offsetMap.remove(offsets.logId().asLong()); + else + offsetMap.put(offsets.logId().asLong(), next); + } + + public void removeAll(MultiOffsets<?> that) + { + for (Offsets offsets : that.offsetMap().values()) + remove(offsets); + } + } + + public static class Mutable extends AbstractMutable<Offsets.Mutable> + { + @Override + protected Offsets.Mutable createOrNull(Offsets.RangeIterator iterator) + { + return Offsets.Mutable.createOrNull(iterator); + } + + @Override + protected Offsets.Mutable create(CoordinatorLogId logId) + { + return new Offsets.Mutable(logId); + } + + @Override + protected Offsets.Mutable copy(Offsets offsets) + { + return Offsets.Mutable.copy(offsets); + } + } + + public static class Immutable extends MultiOffsets<Offsets.Immutable> + { + private final Long2ObjectHashMap<Offsets.Immutable> offsetMap; + + private Immutable(Long2ObjectHashMap<Offsets.Immutable> offsetMap) + { + this.offsetMap = offsetMap; + } + + @Override + Long2ObjectHashMap<Offsets.Immutable> offsetMap() + { + return offsetMap; + } + + public static class Builder extends AbstractMutable<Offsets.Immutable.Builder> + { + @Override + protected Offsets.Immutable.Builder createOrNull(Offsets.RangeIterator iterator) + { + return Offsets.Immutable.Builder.createOrNull(iterator); + } + + @Override + protected Offsets.Immutable.Builder create(CoordinatorLogId logId) + { + return new Offsets.Immutable.Builder(logId); + } + + @Override + protected Offsets.Immutable.Builder copy(Offsets offsets) + { + return Offsets.Immutable.Builder.copy(offsets); + } + + public MultiOffsets.Immutable build() + { + Long2ObjectHashMap<Offsets.Immutable> result = new Long2ObjectHashMap<>(); + offsetMap.forEachLong((key, builder) -> result.put(key, builder.build())); + return new Immutable(result); + } + } + + private static class KeySink implements Consumer<Long> + { + int idx = 0; + final long[] keys; + + public KeySink(int size) + { + this.keys = new long[size]; + } + + @Override + public void accept(Long v) + { + keys[idx++] = v; + } + + public void sort() + { + Arrays.sort(keys); + } + } + + public static final IVersionedSerializer<MultiOffsets.Immutable> serializer = new IVersionedSerializer<MultiOffsets.Immutable>() + { + @Override + public void serialize(MultiOffsets.Immutable mo, DataOutputPlus out, int version) throws IOException + { + int size = mo.offsetMap.size(); + KeySink keys = new KeySink(size); + mo.offsetMap.keySet().forEach(keys); + keys.sort(); Review Comment: We don't really care or check the order anywhere (nor should we?). So I think you can get rid of `KeySink` and sorting here, and just use `Long2ObjectHashMap.iterator()` here - it also doesn't require any boxing of primitives if you use `getLongKey()` + ` getValue()` iteration pattern it supports. ########## src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java: ########## @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.tracked; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.AbstractPartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.*; +import org.apache.cassandra.net.*; +import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.CollectionSerializer; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; +import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; + +public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>> implements RequestCallback<TrackedDataResponse> +{ + private static final Logger logger = LoggerFactory.getLogger(TrackedRead.class); + + public static class Id + { + private static final int nodeId = ClusterMetadata.current().myNodeId().id(); + private static final AtomicLong lastHlc = new AtomicLong(); + + private final int node; + private final long hlc; + + public Id(int node, long hlc) + { + this.node = node; + this.hlc = hlc; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Id id = (Id) o; + return node == id.node && hlc == id.hlc; + } + + @Override + public int hashCode() + { + return Integer.hashCode(node) * 31 + Long.hashCode(hlc); + } + + @Override + public String toString() + { + return "Id{" + node + ':' + hlc + '}'; + } + + public static final IVersionedSerializer<Id> serializer = new IVersionedSerializer<Id>() + { + @Override + public void serialize(Id id, DataOutputPlus out, int version) throws IOException + { + out.writeInt(id.node); + out.writeLong(id.hlc); + } + + @Override + public Id deserialize(DataInputPlus in, int version) throws IOException + { + return new Id(in.readInt(), in.readLong()); + } + + @Override + public long serializedSize(Id id, int version) + { + return TypeSizes.INT_SIZE + TypeSizes.LONG_SIZE; + } + }; + + public static Id nextId() + { + while (true) + { + long lastMicros = lastHlc.get(); + long nextMicros = Math.max(lastMicros + 1, TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis())); + if (lastHlc.compareAndSet(lastMicros, nextMicros)) + return new Id(nodeId, nextMicros); + } + } + } + + private final AsyncPromise<PartitionIterator> future = new AsyncPromise<>(); + + private final Id readId = Id.nextId(); + private final ReplicaPlan.AbstractForRead<E, P> replicaPlan; + private final ConsistencyLevel consistencyLevel; + + private static class RequestFailure extends Throwable + { + private final InetAddressAndPort from; + private final RequestFailureReason reason; + + public RequestFailure(InetAddressAndPort from, RequestFailureReason reason) + { + this.from = from; + this.reason = reason; + } + + public Map<InetAddressAndPort, RequestFailureReason> reasonByEndpoint() + { + return Map.of(from, reason); + } + } + + public TrackedRead(ReplicaPlan.AbstractForRead<E, P> replicaPlan, ConsistencyLevel consistencyLevel) + { + this.replicaPlan = replicaPlan; + this.consistencyLevel = consistencyLevel; + } + + @Override + public String toString() + { + return "TrackedRead." + getClass().getSimpleName() + '{' + readId + '}'; + } + + protected abstract ReadCommand command(); + protected abstract Verb verb(); + + public static Partition create(ClusterMetadata metadata, + SinglePartitionReadCommand command, + ConsistencyLevel consistencyLevel) + { + Preconditions.checkArgument(command.metadata().replicationType().isTracked()); + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); + SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry; + + ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(metadata, + keyspace, + command.partitionKey().getToken(), + command.indexQueryPlan(), + consistencyLevel, + retry); + + return new Partition(command, replicaPlan, consistencyLevel); + } + + public static TrackedRead.Range create(PartitionRangeReadCommand command, + ReplicaPlan.ForRangeRead replicaPlan) + { + Preconditions.checkArgument(command.metadata().replicationType().isTracked()); + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); Review Comment: No longer needed, or intentional, for `open()` side-effects? Either way don't need the return value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org