aweisberg commented on code in PR #4118: URL: https://github.com/apache/cassandra/pull/4118#discussion_r2060744362
########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact Review Comment: Is this comment accurate? Isn't there a coordinator log per replica? I thought we could choose any coordinator replica which is important for reliability/performance. L0 non LCS sstables are going to have quite a few entries just because they can contain mutations from any coordinator in the cluster. ########## src/java/org/apache/cassandra/db/compaction/CompactionManager.java: ########## @@ -1805,6 +1802,7 @@ public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs, if (minLevel == Integer.MAX_VALUE) minLevel = sstable.getSSTableLevel(); + mutationIdRanges = mutationIdRanges.merge(sstable.getMutationIdRanges()); Review Comment: Anti-compaction might produce multiple output sstables? Does it make sense for all of them to share the same mutation id ranges or is this just a requirement because we don't actually know the exact ranges each coordinator id covers? ########## test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java: ########## @@ -1412,10 +1412,10 @@ private void testNotifySSTableFinished(boolean sorted, boolean closeWriterOnFirs } CQLSSTableWriter writer = builder.build(); - int rowCount = 30_000; + final long MiB_1_17 = 1_226_833; // Max SSTable size is 1 MiB - // 30_000 rows should take 30_000 * (4 + 37) = 1.17 MiB > 1 MiB, i.e. producing 2 sstables - for (int i = 0; i < rowCount; i++) + // write 1.17 MiB of data, producing 2 sstables + for (int i = 0; writer.bytesWritten() < MiB_1_17; i++) Review Comment: Is this related to the current patch? ########## test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java: ########## @@ -30,17 +30,12 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.cassandra.db.*; Review Comment: Import style ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact + // representation since it's updated on every write. + @VisibleForTesting + final Long2ObjectHashMap<MutationId> ids; + + private MutationIdRanges() + { + this.ids = new Long2ObjectHashMap<>(); + } + + private MutationIdRanges(Long2ObjectHashMap<MutationId> ids) + { + this.ids = ids; + } + + @Override + public String toString() + { + return "MutationIdRanges{" + + "ids=" + ids + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + MutationIdRanges that = (MutationIdRanges) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hashCode(ids); + } + + public MutationIdRanges merge(MutationIdRanges that) + { + if (this == NONE) + return that; + if (that == NONE) + return this; + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + that.ids.forEachLong((logId, right) -> { + MutationId left = newIds.get(logId); + if (left == null) + newIds.put(logId, right); + else if (ShortMutationId.comparator.compare(left, right) < 0) + newIds.put(logId, right); + }); + + return new MutationIdRanges(newIds); + } + + public MutationIdRanges add(MutationId mutationId) Review Comment: This should at a minimum have `@VisibleForTest` but better would be to just expose the read only interface for the immutable version. ########## src/java/org/apache/cassandra/db/compaction/CompactionManager.java: ########## @@ -1602,7 +1596,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs, CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, nextTimeUUID(), active, null)) { StatsMetadata metadata = sstable.getSSTableMetadata(); - writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, txn)); + writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, metadata.mutationIdRanges, sstable, txn)); Review Comment: Is it problematic that a node loses ranges, but we aren't able to clean up the mutation id ranges list at this step? Will it maybe not participate in the last bits of reconciliation if we aren't careful? We need some kind of bound to ensure that data we don't own anymore is marked reconciled safely. ########## src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java: ########## @@ -26,15 +26,11 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.db.*; Review Comment: Import style ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact + // representation since it's updated on every write. + @VisibleForTesting + final Long2ObjectHashMap<MutationId> ids; + + private MutationIdRanges() + { + this.ids = new Long2ObjectHashMap<>(); + } + + private MutationIdRanges(Long2ObjectHashMap<MutationId> ids) + { + this.ids = ids; + } + + @Override + public String toString() + { + return "MutationIdRanges{" + + "ids=" + ids + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + MutationIdRanges that = (MutationIdRanges) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hashCode(ids); + } + + public MutationIdRanges merge(MutationIdRanges that) + { + if (this == NONE) + return that; + if (that == NONE) + return this; + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); Review Comment: It's tough to say you will know the size of the final map here, but you could size it to the size of the largest one and then use `putAll`. You can also then iterate fewer entries in the loop below by always iterating the smaller map. ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact + // representation since it's updated on every write. + @VisibleForTesting + final Long2ObjectHashMap<MutationId> ids; + + private MutationIdRanges() + { + this.ids = new Long2ObjectHashMap<>(); + } + + private MutationIdRanges(Long2ObjectHashMap<MutationId> ids) + { + this.ids = ids; + } + + @Override + public String toString() + { + return "MutationIdRanges{" + + "ids=" + ids + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + MutationIdRanges that = (MutationIdRanges) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hashCode(ids); + } + + public MutationIdRanges merge(MutationIdRanges that) + { + if (this == NONE) + return that; + if (that == NONE) + return this; + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + that.ids.forEachLong((logId, right) -> { + MutationId left = newIds.get(logId); + if (left == null) + newIds.put(logId, right); + else if (ShortMutationId.comparator.compare(left, right) < 0) + newIds.put(logId, right); + }); + + return new MutationIdRanges(newIds); + } + + public MutationIdRanges add(MutationId mutationId) + { + // Will this allocation will be elided on the path where no update happens? + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + long logId = mutationId.logId(); + MutationId existing = ids.get(logId); + if (existing == null) + newIds.put(logId, mutationId); + else if (ShortMutationId.comparator.compare(existing, mutationId) < 0) + newIds.put(logId, mutationId); + else + return this; + return new MutationIdRanges(newIds); + } + + public int maxOffset(long logId) + { + MutationId id = ids.get(logId); + if (id == null) + return MutationId.none().offset(); Review Comment: Referring to NONE via an accessor method is discouraged by our code style https://cassandra.apache.org/_/development/code_style.html#boilerplate Not that we don't do it all the time. ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact + // representation since it's updated on every write. + @VisibleForTesting + final Long2ObjectHashMap<MutationId> ids; + + private MutationIdRanges() + { + this.ids = new Long2ObjectHashMap<>(); + } + + private MutationIdRanges(Long2ObjectHashMap<MutationId> ids) + { + this.ids = ids; + } + + @Override + public String toString() + { + return "MutationIdRanges{" + + "ids=" + ids + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + MutationIdRanges that = (MutationIdRanges) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hashCode(ids); + } + + public MutationIdRanges merge(MutationIdRanges that) + { + if (this == NONE) + return that; + if (that == NONE) + return this; + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + that.ids.forEachLong((logId, right) -> { + MutationId left = newIds.get(logId); + if (left == null) + newIds.put(logId, right); + else if (ShortMutationId.comparator.compare(left, right) < 0) + newIds.put(logId, right); + }); + + return new MutationIdRanges(newIds); + } + + public MutationIdRanges add(MutationId mutationId) + { + // Will this allocation will be elided on the path where no update happens? + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + long logId = mutationId.logId(); + MutationId existing = ids.get(logId); + if (existing == null) + newIds.put(logId, mutationId); + else if (ShortMutationId.comparator.compare(existing, mutationId) < 0) + newIds.put(logId, mutationId); + else + return this; + return new MutationIdRanges(newIds); + } + + public int maxOffset(long logId) + { + MutationId id = ids.get(logId); + if (id == null) + return MutationId.none().offset(); + return id.offset(); + } + + public static final IVersionedSerializer<MutationIdRanges> serializer = new IVersionedSerializer<>() + { + @Override + public void serialize(MutationIdRanges metadata, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_52) + return; + out.writeInt(metadata.ids.size()); + for (Map.Entry<Long, MutationId> entry : metadata.ids.entrySet()) + { + out.writeLong(entry.getKey()); + MutationId.serializer.serialize(entry.getValue(), out, version); + } + } + + @Override + public MutationIdRanges deserialize(DataInputPlus in, int version) throws IOException + { + if (version < MessagingService.VERSION_52) Review Comment: We don't usually put the version check inside the serializer, but I actually really like this better. ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact + // representation since it's updated on every write. + @VisibleForTesting + final Long2ObjectHashMap<MutationId> ids; + + private MutationIdRanges() + { + this.ids = new Long2ObjectHashMap<>(); + } + + private MutationIdRanges(Long2ObjectHashMap<MutationId> ids) + { + this.ids = ids; + } + + @Override + public String toString() + { + return "MutationIdRanges{" + + "ids=" + ids + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + MutationIdRanges that = (MutationIdRanges) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hashCode(ids); + } + + public MutationIdRanges merge(MutationIdRanges that) + { + if (this == NONE) + return that; + if (that == NONE) + return this; + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + that.ids.forEachLong((logId, right) -> { + MutationId left = newIds.get(logId); + if (left == null) + newIds.put(logId, right); + else if (ShortMutationId.comparator.compare(left, right) < 0) + newIds.put(logId, right); + }); + + return new MutationIdRanges(newIds); + } + + public MutationIdRanges add(MutationId mutationId) + { + // Will this allocation will be elided on the path where no update happens? + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + long logId = mutationId.logId(); + MutationId existing = ids.get(logId); + if (existing == null) + newIds.put(logId, mutationId); + else if (ShortMutationId.comparator.compare(existing, mutationId) < 0) + newIds.put(logId, mutationId); + else + return this; + return new MutationIdRanges(newIds); + } + + public int maxOffset(long logId) + { + MutationId id = ids.get(logId); + if (id == null) + return MutationId.none().offset(); + return id.offset(); + } + + public static final IVersionedSerializer<MutationIdRanges> serializer = new IVersionedSerializer<>() + { + @Override + public void serialize(MutationIdRanges metadata, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_52) + return; + out.writeInt(metadata.ids.size()); Review Comment: Instead of serializing the map manually add support for `Long2ObjectHashMap` to `CollectionSerializers` and use that. We also seem to be very aggressive about using variable length integers everywhere these days. I am skeptical it's always worth the cycles, but it's the trend. ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges Review Comment: You can make `MutationIdRanges` and interface, and then have the implementation extend `Long2ObjectHashMap` directly so there is no pointer chasing to access the map. Then these could be mutable and constructed directly. Then cast to a different interface that is immutable to indicate the point at which it should no longer be mutable. ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact + // representation since it's updated on every write. + @VisibleForTesting + final Long2ObjectHashMap<MutationId> ids; + + private MutationIdRanges() + { + this.ids = new Long2ObjectHashMap<>(); + } + + private MutationIdRanges(Long2ObjectHashMap<MutationId> ids) + { + this.ids = ids; + } + + @Override + public String toString() + { + return "MutationIdRanges{" + + "ids=" + ids + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + MutationIdRanges that = (MutationIdRanges) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hashCode(ids); + } + + public MutationIdRanges merge(MutationIdRanges that) + { + if (this == NONE) + return that; + if (that == NONE) + return this; + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + that.ids.forEachLong((logId, right) -> { + MutationId left = newIds.get(logId); + if (left == null) + newIds.put(logId, right); + else if (ShortMutationId.comparator.compare(left, right) < 0) + newIds.put(logId, right); + }); + + return new MutationIdRanges(newIds); + } + + public MutationIdRanges add(MutationId mutationId) + { + // Will this allocation will be elided on the path where no update happens? Review Comment: Wouldn't matter if you left it mutable and let the test code use the regular add method. Not that test code performance matters much. ########## src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java: ########## @@ -221,6 +224,35 @@ public EncodingStats get() } } + public static class MutationIdCollector + { + private final AtomicReference<MutationIdRanges> ranges = new AtomicReference<>(MutationIdRanges.NONE); + + /** + * This is called on every memtable write, so would be a good optimization target. In particular, {@link #get} + * is called on expensive infrequent operations (mainly flush), so we would benefit from moving some effort out + * of this method. + */ + public void add(MutationId mutationId) Review Comment: This can be pretty easily improved from a performance perspective. I know we have a goal for the number of coordinators to be small, but I think it's important not to build that in as a limitation. It's also more allocations and instructions than are necessary added to each memtable write. `NonBlockingHashMapLong.replace` can do this mutably with very reasonable performance. Then you can convert to something that is immutable. ########## src/java/org/apache/cassandra/db/memtable/Memtable.java: ########## @@ -202,6 +202,10 @@ interface Owner /** Size of the data not accounting for any metadata / mapping overheads */ long getLiveDataSize(); + /** Snapshot of the mutation id ranges applied to this memtable */ + @VisibleForTesting Review Comment: This is used outside of testing in `ColumnFamilyStore` ########## src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java: ########## @@ -265,15 +271,23 @@ public FlushablePartitionSet<AtomicBTreePartition> getFlushSet(PartitionPosition long keySize = 0; int keyCount = 0; - for (Iterator<AtomicBTreePartition> it = getPartitionIterator(from, true, to,false); it.hasNext();) + for (Iterator<AtomicBTreePartition> it = getPartitionIterator(from, true, to, false); it.hasNext(); ) { AtomicBTreePartition en = it.next(); keySize += en.partitionKey().getKey().remaining(); keyCount++; } long partitionKeySize = keySize; int partitionCount = keyCount; - Iterator<AtomicBTreePartition> toFlush = getPartitionIterator(from, true, to,false); + Iterator<AtomicBTreePartition> toFlush = getPartitionIterator(from, true, to, false); + + MutationIdRanges mutationIdRanges; + { + MutationIdRanges tempRanges = MutationIdRanges.NONE; + for (MemtableShard shard : shards) + tempRanges = tempRanges.merge(shard.mutationIdCollector.get()); Review Comment: Static method ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -363,6 +365,13 @@ public FlushablePartitionSet<MemtablePartition> getFlushSet(PartitionPosition fr } long partitionKeySize = keySize; int partitionCount = keyCount; + MutationIdRanges mutationIdRanges; + { + MutationIdRanges tempRanges = MutationIdRanges.NONE; + for (MemtableShard shard : shards) + tempRanges = tempRanges.merge(shard.mutationIdCollector.get()); Review Comment: static method ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -235,6 +228,15 @@ public long partitionCount() return total; } + @Override + public MutationIdRanges getMutationIdRanges() + { + MutationIdRanges ranges = MutationIdRanges.NONE; + for (MemtableShard shard : shards) + ranges = ranges.merge(shard.mutationIdCollector.get()); Review Comment: static method ########## src/java/org/apache/cassandra/db/compaction/CompactionTask.java: ########## @@ -407,6 +408,18 @@ public static boolean getIsTransient(Set<SSTableReader> sstables) return isTransient; } + public static MutationIdRanges getMutationIdRanges(Set<SSTableReader> sstables) + { + MutationIdRanges mutationIdRanges = MutationIdRanges.NONE; + if (sstables.isEmpty()) + return mutationIdRanges; + + for (SSTableReader sstable: sstables) + mutationIdRanges = mutationIdRanges.merge(sstable.getMutationIdRanges()); Review Comment: This would be better implemented as a static method in `MutationIdRanges` that takes in an `Iterable` of things that can supply mutation ID ranges. Then it only needs to allocate once and can then merge them all in at once. Not that performance is critical here. ########## src/java/org/apache/cassandra/db/memtable/TrieMemtable.java: ########## @@ -29,19 +29,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; +import org.apache.cassandra.db.*; Review Comment: Import style ########## src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java: ########## @@ -27,14 +27,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; +import org.apache.cassandra.db.*; Review Comment: Wrong import style ########## src/java/org/apache/cassandra/db/compaction/CompactionManager.java: ########## @@ -57,6 +57,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.db.*; Review Comment: Your IDE is misconfigured and is converting single imports into * imports. The generated code style for IntelliJ does the correct thing and soonish this will be our official code style and enforced by checkstyle. ########## src/java/org/apache/cassandra/db/MutationIdRanges.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.MutationId; +import org.apache.cassandra.replication.ShortMutationId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; + +import static org.apache.cassandra.db.memtable.AbstractMemtable.MutationIdCollector; + +/** + * Max mutation ID present in this SSTable for each coordinator log, to determine whether an SSTable is reconciled or + * not. Once max mutation IDs are reconciled, next compaction can safely mark this SSTabled as repaired. Note that peers + * may have reconciled all mutations included in an SSTable, but {@link StatsMetadata#repairedAt} is dependent on + * compaction timing, so "nodetool repair --validate" may report temporary disagreements on the repaired set. + * <p> + * This is immutable, so update-heavy paths are expected to use {@link MutationIdCollector}. + */ +public class MutationIdRanges +{ + public static final MutationIdRanges NONE = new MutationIdRanges(); + + // Keyed by CoordinatorLogId, this should only contain a handful of elements, because there's only one coordinator + // log per range. Iterating across keys should not be expensive, but this would benefit from a more compact + // representation since it's updated on every write. + @VisibleForTesting + final Long2ObjectHashMap<MutationId> ids; + + private MutationIdRanges() + { + this.ids = new Long2ObjectHashMap<>(); + } + + private MutationIdRanges(Long2ObjectHashMap<MutationId> ids) + { + this.ids = ids; + } + + @Override + public String toString() + { + return "MutationIdRanges{" + + "ids=" + ids + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + MutationIdRanges that = (MutationIdRanges) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hashCode(ids); + } + + public MutationIdRanges merge(MutationIdRanges that) + { + if (this == NONE) + return that; + if (that == NONE) + return this; + Long2ObjectHashMap<MutationId> newIds = new Long2ObjectHashMap<>(ids); + that.ids.forEachLong((logId, right) -> { + MutationId left = newIds.get(logId); + if (left == null) Review Comment: `Long2ObjectHashMap` has a `merge` method from `Map`. It saves doing the lookup in the map multiple times to accomplish the merge. ########## src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java: ########## @@ -26,15 +26,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.UnmodifiableIterator; +import org.apache.cassandra.db.*; Review Comment: Import style ########## src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java: ########## @@ -112,14 +117,15 @@ protected SSTableTxnWriter createWriter(SSTable.Owner owner) throws IOException SerializationHeader header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS); if (makeRangeAware) - return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, false, format, header); + return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, false, MutationIdRanges.NONE, format, header); Review Comment: In what context is this used and why can we not provide `MutationIdRanges`? This is implemented like test code, but not stored in the test package. It should probably be moved since it looked like it was all only used for test. If it stays here then the source of `MutationIdRanges.NONE` should be the test code constructing this so we know we don't get down here and haven't provided mutation id ranges when we should have. ########## src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java: ########## @@ -156,6 +153,15 @@ public long partitionCount() return total; } + @Override + public MutationIdRanges getMutationIdRanges() + { + MutationIdRanges ranges = MutationIdRanges.NONE; + for (MemtableShard shard : shards) + ranges = ranges.merge(shard.mutationIdCollector.get()); Review Comment: Same suggestion re making this a static method in `MutationIdRanges` ########## test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java: ########## @@ -31,25 +31,13 @@ import java.util.stream.StreamSupport; import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.db.*; Review Comment: Import style ########## src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java: ########## @@ -437,6 +441,7 @@ public abstract static class Builder<W extends SSTableWriter, B extends Builder< private boolean transientSSTable; private SerializationHeader serializationHeader; private List<Index.Group> indexGroups; + private MutationIdRanges mutationIdRanges = MutationIdRanges.NONE; Review Comment: Can you require that `mutationId` ranges be provided? I don't think this is something that should have a default value. If it's going to be `NONE` make the caller provide `NONE`. ########## src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java: ########## @@ -100,6 +92,7 @@ public static StatsMetadata defaultStatsMetadata() null, false, true, + MutationIdRanges.NONE, Review Comment: Yikes so this is used by `MetadataSerializer` which silently allows you to fail to read the stats metadata file if doesn't exist. The default values here are generally conservative to ensure things behave correctly like setting it to not repired, min and max values for timestamps and keys. Mutation ID ranges should probably similarly conservative in some way that ensures that we don't mark this as "repaired" until it must have been repaired by definition. ########## src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java: ########## @@ -22,12 +22,7 @@ import java.util.Collection; import java.util.List; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.DiskBoundaries; -import org.apache.cassandra.db.PartitionPosition; -import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.*; Review Comment: Import style ########## test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java: ########## @@ -28,6 +28,7 @@ import com.google.common.primitives.Ints; import org.apache.cassandra.replication.*; +import org.apache.cassandra.db.MutationIdRanges; Review Comment: Unused import ########## src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java: ########## @@ -44,6 +44,7 @@ import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.db.*; Review Comment: Import style ########## test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java: ########## @@ -38,14 +38,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.cassandra.db.*; Review Comment: Import style ########## test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java: ########## @@ -113,7 +113,7 @@ public class LegacySSTableTest // Get all versions up to the current one. Useful for testing in compatibility mode C18301 private static String[] getValidLegacyVersions() { - String[] versions = {"da", "oa", "nb", "na", "me", "md", "mc", "mb", "ma"}; + String[] versions = {"ob", "oa", "da", "nb", "na", "me", "md", "mc", "mb", "ma"}; Review Comment: No `db`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org