dcapwell commented on code in PR #2256: URL: https://github.com/apache/cassandra/pull/2256#discussion_r1160155296
########## src/java/org/apache/cassandra/journal/Index.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import javax.annotation.Nullable; + +import org.apache.cassandra.utils.Closeable; + +/** + * Mapping of client supplied ids to in-segment offsets + */ +interface Index<K> extends Closeable +{ + /** + * Update the index with a new entry with id and offset + */ + void update(K id, int offset); + + /** + * Look up offsets by id. It's possible, due to retries, for a segment + * to contain the same record with the same id more than once, at + * different offsets. + * + * @return the found offsets into the segment, if any; can be empty + */ + int[] lookUp(K id); + + /** + * Look up offsets by id. It's possible, due to retries, for a segment + * to contain the same record with the same id more than once, at + * different offsets. Return the first offset for provided record id, or -1 if none. + * + * @return the first offset into the segment, or -1 is none were found + */ + int lookUpFirst(K id); + + /** + * @return the first (smallest) id in the index + */ + @Nullable + K firstId(); + + /** + * @return the last (largest) id in the index + */ + @Nullable + K lastId(); + + /** + * Persist the index on disk to the file matching the desrcriptor. + */ + void persist(Descriptor descriptor); + + /** + * @return whether the id falls within lower/upper bounds of the index + */ + default boolean mayContainId(K id, KeySupport<K> keySupport) Review Comment: I feel like `KeySupport` should be removed from this method and a new `KeySupport<K> support()` get added (or implement in an abstract class), that way its not possible for the "wrong" logic to get used ########## src/java/org/apache/cassandra/journal/ActiveSegment.java: ########## @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +import com.codahale.metrics.Timer; +import org.apache.cassandra.io.util.*; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +final class ActiveSegment<K> extends Segment<K> +{ + final FileChannel channel; + + // OpOrder used to order appends wrt flush + private final OpOrder appendOrder = new OpOrder(); + + // position in the buffer we are allocating from + private final AtomicInteger allocatePosition = new AtomicInteger(0); + + /* + * Everything before this offset has been written and flushed. + */ + private volatile int lastFlushedOffset = 0; + + /* + * End position of the buffer; initially set to its capacity and + * updated to point to the last written position as the segment is being closed + * no need to be volatile as writes are protected by appendOrder barrier. + */ + private int endOfBuffer; + + // a signal that writers can wait on to be notified of a completed flush in BATCH and GROUP FlushMode + private final WaitQueue flushComplete = WaitQueue.newWaitQueue(); + + private final Ref<Segment<K>> selfRef; + + private ActiveSegment( + Descriptor descriptor, Params params, SyncedOffsets syncedOffsets, Index<K> index, Metadata metadata, KeySupport<K> keySupport) + { + super(descriptor, syncedOffsets, index, metadata, keySupport); + + try + { + channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE); + buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, params.segmentSize()); + endOfBuffer = buffer.capacity(); + selfRef = new Ref<>(this, new Tidier(descriptor, channel, buffer, syncedOffsets)); + } + catch (IOException e) + { + throw new JournalWriteError(descriptor, file, e); + } + } + + static <K> ActiveSegment<K> create(Descriptor descriptor, Params params, KeySupport<K> keySupport) + { + SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor, true); + Index<K> index = InMemoryIndex.create(keySupport); + Metadata metadata = Metadata.create(); + return new ActiveSegment<>(descriptor, params, syncedOffsets, index, metadata, keySupport); + } + + /** + * Read the entry and specified offset into the entry holder. + * Expects the caller to acquire the ref to the segment and the record to exist. + */ + @Override + boolean read(int offset, EntrySerializer.EntryHolder<K> into) + { + ByteBuffer duplicate = buffer.duplicate().position(offset).limit(buffer.capacity()); + try + { + EntrySerializer.read(into, keySupport, duplicate, descriptor.userVersion); + } + catch (IOException e) + { + throw new JournalReadError(descriptor, file, e); + } + return true; + } + + /** + * Stop writing to this file, flush and close it. Does nothing if the file is already closed. + */ + @Override + public synchronized void close() + { + close(true); + } + + /** + * @return true if the closed segment was definitely empty, false otherwise + */ + private synchronized boolean close(boolean persistComponents) + { + boolean isEmpty = discardUnusedTail(); + if (!isEmpty) + { + flush(); + if (persistComponents) persistComponents(); + } + release(); + return isEmpty; + } + + /** + * Close and discard a pre-allocated, available segment, that's never been exposed + */ + void closeAndDiscard() + { + boolean isEmpty = close(false); + if (!isEmpty) throw new IllegalStateException(); + discard(); + } + + void closeAndIfEmptyDiscard() + { + boolean isEmpty = close(true); + if (isEmpty) discard(); + } + + void persistComponents() + { + index.persist(descriptor); + metadata.persist(descriptor); Review Comment: think we need `org.apache.cassandra.utils.SyncUtil#trySyncDir` after ########## src/java/org/apache/cassandra/journal/KeySupport.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.zip.Checksum; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Record keys must satisfy two properties: + * <p> + * 1. Must have a fixed serialized size + * 2. Must be byte-order comparable + */ +public interface KeySupport<K> extends Comparator<K> +{ + int serializedSize(int userVersion); + + void serialize(K key, DataOutputPlus out, int userVersion) throws IOException; + + K deserialize(DataInputPlus in, int userVersion) throws IOException; + + K deserialize(ByteBuffer buffer, int position, int userVersion); + + void updateChecksum(Checksum crc, K key, int userVersion); Review Comment: nit: for consistency with `serialize` would be good to be `K key, Checksum crc, int userVersion` ########## src/java/org/apache/cassandra/journal/StaticSegment.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.NoSuchFileException; +import java.nio.file.StandardOpenOption; +import java.util.*; + +import org.agrona.collections.IntHashSet; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Closeable; +import org.apache.cassandra.utils.concurrent.Ref; + +/** + * An immutable data segment that is no longer written to. + * <p> + * Can be compacted with input from {@code PersistedInvalidations} into a new smaller segment, + * with invalidated entries removed. + */ +final class StaticSegment<K> extends Segment<K> +{ + final FileChannel channel; + + private final Ref<Segment<K>> selfRef; + + private StaticSegment(Descriptor descriptor, + FileChannel channel, + MappedByteBuffer buffer, + SyncedOffsets syncedOffsets, + Index<K> index, + Metadata metadata, + KeySupport<K> keySupport) + { + super(descriptor, syncedOffsets, index, metadata, keySupport); + + this.channel = channel; + this.buffer = buffer; + + selfRef = new Ref<>(this, new Tidier<>(descriptor, channel, buffer, index)); + } + + /** + * Loads all segments matching the supplied desctiptors + * + * @param descriptors descriptors of the segments to load + * @return list of the loaded segments + */ + static <K> List<StaticSegment<K>> open(Collection<Descriptor> descriptors, KeySupport<K> keySupport) + { + List<StaticSegment<K>> segments = new ArrayList<>(descriptors.size()); + for (Descriptor descriptor : descriptors) + segments.add(open(descriptor, keySupport)); + return segments; + } + + /** + * Load the segment corresponding to the provided desrciptor + * + * @param descriptor descriptor of the segment to load + * @return the loaded segment + */ + @SuppressWarnings({ "resource", "RedundantSuppression" }) + static <K> StaticSegment<K> open(Descriptor descriptor, KeySupport<K> keySupport) + { + if (!Component.DATA.existsFor(descriptor)) + throw new IllegalArgumentException("Data file for segment " + descriptor + " doesn't exist"); + + SyncedOffsets syncedOffsets = Component.SYNCED_OFFSETS.existsFor(descriptor) + ? SyncedOffsets.load(descriptor) + : SyncedOffsets.absent(); + + Metadata metadata = Component.INDEX.existsFor(descriptor) + ? Metadata.load(descriptor) + : Metadata.rebuildAndPersist(descriptor, keySupport, syncedOffsets.syncedOffset()); + + OnDiskIndex<K> index = Component.METADATA.existsFor(descriptor) + ? OnDiskIndex.open(descriptor, keySupport) + : OnDiskIndex.rebuildAndPersist(descriptor, keySupport, syncedOffsets.syncedOffset()); Review Comment: I think this is backwards, if `INDEX` exists or not we load `Metadata`, and when `METADATA` exists or not we load `Index`? ########## src/java/org/apache/cassandra/journal/OnDiskIndex.java: ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Map; +import java.util.NavigableMap; +import java.util.zip.CRC32; + +import javax.annotation.Nullable; + +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Crc; + +import static org.apache.cassandra.journal.Journal.validateCRC; +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; + +/** + * An on-disk (memory-mapped) index for a completed flushed segment. + * <p/> + * TODO (expected): block-level CRC + */ +final class OnDiskIndex<K> implements Index<K> +{ + private static final int[] EMPTY = new int[0]; + + private static final int FILE_PREFIX_SIZE = 4 + 4; // count of entries, CRC + private static final int VALUE_SIZE = 4; // int offset + + private final int KEY_SIZE; + private final int ENTRY_SIZE; + + private final Descriptor descriptor; + private final KeySupport<K> keySupport; + + private final FileChannel channel; + private volatile MappedByteBuffer buffer; + private final int entryCount; + + private volatile K firstId, lastId; + + private OnDiskIndex( + Descriptor descriptor, KeySupport<K> keySupport, FileChannel channel, MappedByteBuffer buffer, int entryCount) + { + this.descriptor = descriptor; + this.keySupport = keySupport; + this.channel = channel; + this.buffer = buffer; + this.entryCount = entryCount; + + KEY_SIZE = keySupport.serializedSize(descriptor.userVersion); + ENTRY_SIZE = KEY_SIZE + VALUE_SIZE; + } + + /** + * Open the index for reading, validate CRC + */ + @SuppressWarnings({ "resource", "RedundantSuppression" }) + static <K> OnDiskIndex<K> open(Descriptor descriptor, KeySupport<K> keySupport) + { + File file = descriptor.fileFor(Component.INDEX); + FileChannel channel = null; + MappedByteBuffer buffer = null; + try + { + channel = FileChannel.open(file.toPath(), StandardOpenOption.READ); + buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size()); + + int entryCount = buffer.getInt(0); + OnDiskIndex<K> index = new OnDiskIndex<>(descriptor, keySupport, channel, buffer, entryCount); + index.validate(); + index.init(); + return index; + } + catch (Throwable e) + { + FileUtils.clean(buffer); + FileUtils.closeQuietly(channel); + throw new JournalReadError(descriptor, file, e); + } + } + + private void init() + { + if (entryCount > 0) + { + firstId = keyAtIndex(0); + lastId = keyAtIndex(entryCount - 1); + } + } + + @Override + public void close() + { + try + { + FileUtils.clean(buffer); + buffer = null; + channel.close(); + } + catch (IOException e) + { + throw new JournalWriteError(descriptor, Component.INDEX, e); + } + } + + void validate() throws IOException + { + CRC32 crc = Crc.crc32(); + + try (DataInputBuffer in = new DataInputBuffer(buffer, true)) + { + int entryCount = in.readInt(); + updateChecksumInt(crc, entryCount); + validateCRC(crc, in.readInt()); + + Crc.updateCrc32(crc, buffer, FILE_PREFIX_SIZE, FILE_PREFIX_SIZE + entryCount * ENTRY_SIZE); + in.skipBytesFully(entryCount * ENTRY_SIZE); + validateCRC(crc, in.readInt()); + + if (in.available() != 0) + throw new IOException("Trailing data encountered in segment index " + descriptor.fileFor(Component.INDEX)); + } + } + + static <K> void write( + NavigableMap<K, int[]> entries, KeySupport<K> keySupport, DataOutputPlus out, int userVersion) throws IOException + { + CRC32 crc = Crc.crc32(); + + int size = entries.values() + .stream() + .mapToInt(offsets -> offsets.length) + .sum(); + out.writeInt(size); + updateChecksumInt(crc, size); + out.writeInt((int) crc.getValue()); + + for (Map.Entry<K, int[]> entry : entries.entrySet()) + { + for (int offset : entry.getValue()) + { + K key = entry.getKey(); + keySupport.serialize(key, out, userVersion); + keySupport.updateChecksum(crc, key, userVersion); + + out.writeInt(offset); + updateChecksumInt(crc, offset); + } + } + + out.writeInt((int) crc.getValue()); + } + + @Override + @Nullable + public K firstId() + { + return firstId; + } + + @Override + @Nullable + public K lastId() + { + return lastId; + } + + @Override + public int[] lookUp(K id) + { + if (!mayContainId(id, keySupport)) + return EMPTY; + + int keyIndex = binarySearch(id); + if (keyIndex < 0) + return EMPTY; + + int[] offsets = new int[] { offsetAtIndex(keyIndex) }; + + /* + * Duplicate entries are possible within one segment (but should be rare). + * Check and add entries before and after the found result (not guaranteed to be first). + */ + + for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--) + { + int length = offsets.length; + offsets = Arrays.copyOf(offsets, length + 1); + offsets[length] = offsetAtIndex(i); + } + + for (int i = keyIndex + 1; i < entryCount && id.equals(keyAtIndex(i)); i++) + { + int length = offsets.length; + offsets = Arrays.copyOf(offsets, length + 1); + offsets[length] = offsetAtIndex(i); + } + + Arrays.sort(offsets); + return offsets; + } + + @Override + public int lookUpFirst(K id) + { + if (!mayContainId(id, keySupport)) + return -1; + + int keyIndex = binarySearch(id); + + /* + * Duplicate entries are possible within one segment (but should be rare). + * Check and add entries before until we find the first occurrence of key. + */ + for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--) + keyIndex = i; + + return keyIndex < 0 ? -1 : offsetAtIndex(keyIndex); + } + + private K keyAtIndex(int index) + { + return keySupport.deserialize(buffer, FILE_PREFIX_SIZE + index * ENTRY_SIZE, descriptor.userVersion); + } + + private int offsetAtIndex(int index) + { + return buffer.getInt(FILE_PREFIX_SIZE + index * ENTRY_SIZE + KEY_SIZE); + } + + /* + * This has been lifted from {@see IndexSummary}'s implementation, + * which itself was lifted from Harmony's Collections implementation. + */ + private int binarySearch(K key) + { + int low = 0, mid = entryCount, high = mid - 1, result = -1; + while (low <= high) + { + mid = (low + high) >> 1; + result = -compareWithKeyAt(key, mid); + if (result > 0) + { + low = mid + 1; + } + else if (result == 0) + { + return mid; + } + else + { + high = mid - 1; + } + } + return -mid - (result < 0 ? 1 : 2); + } + + private int compareWithKeyAt(K key, int keyIndex) + { + int offset = FILE_PREFIX_SIZE + ENTRY_SIZE * keyIndex; + return keySupport.compareWithKeyAt(key, buffer, offset, descriptor.userVersion); + } + + @Override + public void update(K id, int offset) + { + throw new UnsupportedOperationException(); + } + + @Override + public void persist(Descriptor descriptor) + { + throw new UnsupportedOperationException(); + } Review Comment: might be good to have `Index` and `MutableIndex` that way this is solved at the type level ########## src/java/org/apache/cassandra/journal/OnDiskIndex.java: ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Map; +import java.util.NavigableMap; +import java.util.zip.CRC32; + +import javax.annotation.Nullable; + +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Crc; + +import static org.apache.cassandra.journal.Journal.validateCRC; +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; + +/** + * An on-disk (memory-mapped) index for a completed flushed segment. + * <p/> + * TODO (expected): block-level CRC + */ +final class OnDiskIndex<K> implements Index<K> Review Comment: wondering if using the term index in CASSANDRA-16052 would be useful once that lands and we rebase again. ########## src/java/org/apache/cassandra/journal/OnDiskIndex.java: ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Map; +import java.util.NavigableMap; +import java.util.zip.CRC32; + +import javax.annotation.Nullable; + +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Crc; + +import static org.apache.cassandra.journal.Journal.validateCRC; +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; + +/** + * An on-disk (memory-mapped) index for a completed flushed segment. + * <p/> + * TODO (expected): block-level CRC + */ +final class OnDiskIndex<K> implements Index<K> +{ + private static final int[] EMPTY = new int[0]; + + private static final int FILE_PREFIX_SIZE = 4 + 4; // count of entries, CRC + private static final int VALUE_SIZE = 4; // int offset + + private final int KEY_SIZE; + private final int ENTRY_SIZE; + + private final Descriptor descriptor; + private final KeySupport<K> keySupport; + + private final FileChannel channel; + private volatile MappedByteBuffer buffer; + private final int entryCount; + + private volatile K firstId, lastId; + + private OnDiskIndex( + Descriptor descriptor, KeySupport<K> keySupport, FileChannel channel, MappedByteBuffer buffer, int entryCount) + { + this.descriptor = descriptor; + this.keySupport = keySupport; + this.channel = channel; + this.buffer = buffer; + this.entryCount = entryCount; + + KEY_SIZE = keySupport.serializedSize(descriptor.userVersion); + ENTRY_SIZE = KEY_SIZE + VALUE_SIZE; + } + + /** + * Open the index for reading, validate CRC + */ + @SuppressWarnings({ "resource", "RedundantSuppression" }) + static <K> OnDiskIndex<K> open(Descriptor descriptor, KeySupport<K> keySupport) + { + File file = descriptor.fileFor(Component.INDEX); + FileChannel channel = null; + MappedByteBuffer buffer = null; + try + { + channel = FileChannel.open(file.toPath(), StandardOpenOption.READ); + buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size()); + + int entryCount = buffer.getInt(0); + OnDiskIndex<K> index = new OnDiskIndex<>(descriptor, keySupport, channel, buffer, entryCount); + index.validate(); + index.init(); + return index; + } + catch (Throwable e) + { + FileUtils.clean(buffer); + FileUtils.closeQuietly(channel); + throw new JournalReadError(descriptor, file, e); + } + } + + private void init() + { + if (entryCount > 0) + { + firstId = keyAtIndex(0); + lastId = keyAtIndex(entryCount - 1); + } + } + + @Override + public void close() + { + try + { + FileUtils.clean(buffer); + buffer = null; + channel.close(); + } + catch (IOException e) + { + throw new JournalWriteError(descriptor, Component.INDEX, e); + } + } + + void validate() throws IOException + { + CRC32 crc = Crc.crc32(); + + try (DataInputBuffer in = new DataInputBuffer(buffer, true)) + { + int entryCount = in.readInt(); + updateChecksumInt(crc, entryCount); + validateCRC(crc, in.readInt()); Review Comment: should also verify that the provided `entryCount` to constructor matches ``` if (entryCount != this.entryCount) throw new IOException(String.format("Read entry count %d != %d provided on init", this.entryCount, entryCount)); ``` ########## src/java/org/apache/cassandra/journal/InMemoryIndex.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.util.Arrays; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; + +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileOutputStreamPlus; + +/** + * An index for a segment that's still being updated by journal writers concurrently. + */ +final class InMemoryIndex<K> implements Index<K> +{ + private static final int[] EMPTY = new int[0]; + + private final KeySupport<K> keySupport; + private final NavigableMap<K, int[]> index; + private final AtomicReference<K> lastId; Review Comment: it would be good for maintenance to explain why this exists. As far as I can tell reading the code `return index.isEmpty() ? null : index.lastKey();` is `O(N)` complexity, so this only exists to make that `O(1)`, which is leveraged in `org.apache.cassandra.journal.Index#mayContainId` ########## src/java/org/apache/cassandra/journal/Descriptor.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.cassandra.io.util.File; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; + +/** + * Timestamp and version encoded in the file name, e.g. + * log-1637159888484-2-1-1.data + * log-1637159888484-2-1-1.indx + * log-1637159888484-2-1-1.meta + * log-1637159888484-2-1-1.sync + */ +final class Descriptor implements Comparable<Descriptor> +{ + private static final String SEPARATOR = "-"; + private static final String PREFIX = "log" + SEPARATOR; + private static final String TMP_SUFFIX = "tmp"; + + private static final Pattern DATA_FILE_PATTERN = + Pattern.compile( PREFIX + "(\\d+)" // timestamp + + SEPARATOR + "(\\d+)" // generation + + SEPARATOR + "(\\d+)" // journal version + + SEPARATOR + "(\\d+)" // user version + + "\\." + Component.DATA.extension); + + private static final Pattern TMP_FILE_PATTERN = + Pattern.compile( PREFIX + "\\d+" // timestamp + + SEPARATOR + "\\d+" // generation + + SEPARATOR + "\\d+" // journal version + + SEPARATOR + "\\d+" // user version + + "\\." + "[a-z]+" // component extension + + "\\." + TMP_SUFFIX); + + + static final int JOURNAL_VERSION_1 = 1; + static final int CURRENT_JOURNAL_VERSION = JOURNAL_VERSION_1; + + final File directory; + final long timestamp; + final int generation; + + /** + * Serialization version for journal components; bumped as journal + * implementation evolves over time. + */ + final int journalVersion; + + /** + * Serialization version for user content - specifically journal keys + * and journal values; bumped when user logic evolves. + */ + final int userVersion; + + private Descriptor(File directory, long timestamp, int generation, int journalVersion, int userVersion) + { + this.directory = directory; + this.timestamp = timestamp; + this.generation = generation; + this.journalVersion = journalVersion; + this.userVersion = userVersion; + } + + static Descriptor create(File directory, long timestamp, int userVersion) + { + return new Descriptor(directory, timestamp, 1, CURRENT_JOURNAL_VERSION, userVersion); + } + + static Descriptor fromName(File directory, String name) + { + Matcher matcher = DATA_FILE_PATTERN.matcher(name); + if (!matcher.matches()) + throw new IllegalArgumentException("Provided filename is not valid for a data segment file"); + + long timestamp = Long.parseLong(matcher.group(1)); + int generation = Integer.parseInt(matcher.group(2)); + int journalVersion = Integer.parseInt(matcher.group(3)); + int userVersion = Integer.parseInt(matcher.group(4)); + + return new Descriptor(directory, timestamp, generation, journalVersion, userVersion); + } + + Descriptor withIncrementedGeneration() Review Comment: dead code ########## src/java/org/apache/cassandra/journal/Descriptor.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.cassandra.io.util.File; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; + +/** + * Timestamp and version encoded in the file name, e.g. + * log-1637159888484-2-1-1.data + * log-1637159888484-2-1-1.indx + * log-1637159888484-2-1-1.meta + * log-1637159888484-2-1-1.sync + */ +final class Descriptor implements Comparable<Descriptor> +{ + private static final String SEPARATOR = "-"; + private static final String PREFIX = "log" + SEPARATOR; + private static final String TMP_SUFFIX = "tmp"; + + private static final Pattern DATA_FILE_PATTERN = + Pattern.compile( PREFIX + "(\\d+)" // timestamp + + SEPARATOR + "(\\d+)" // generation + + SEPARATOR + "(\\d+)" // journal version + + SEPARATOR + "(\\d+)" // user version + + "\\." + Component.DATA.extension); + + private static final Pattern TMP_FILE_PATTERN = + Pattern.compile( PREFIX + "\\d+" // timestamp + + SEPARATOR + "\\d+" // generation + + SEPARATOR + "\\d+" // journal version + + SEPARATOR + "\\d+" // user version + + "\\." + "[a-z]+" // component extension + + "\\." + TMP_SUFFIX); + + + static final int JOURNAL_VERSION_1 = 1; + static final int CURRENT_JOURNAL_VERSION = JOURNAL_VERSION_1; + + final File directory; + final long timestamp; + final int generation; + + /** + * Serialization version for journal components; bumped as journal + * implementation evolves over time. + */ + final int journalVersion; + + /** + * Serialization version for user content - specifically journal keys + * and journal values; bumped when user logic evolves. + */ + final int userVersion; + + private Descriptor(File directory, long timestamp, int generation, int journalVersion, int userVersion) + { + this.directory = directory; + this.timestamp = timestamp; + this.generation = generation; + this.journalVersion = journalVersion; + this.userVersion = userVersion; + } + + static Descriptor create(File directory, long timestamp, int userVersion) + { + return new Descriptor(directory, timestamp, 1, CURRENT_JOURNAL_VERSION, userVersion); + } + + static Descriptor fromName(File directory, String name) + { + Matcher matcher = DATA_FILE_PATTERN.matcher(name); + if (!matcher.matches()) + throw new IllegalArgumentException("Provided filename is not valid for a data segment file"); + + long timestamp = Long.parseLong(matcher.group(1)); + int generation = Integer.parseInt(matcher.group(2)); + int journalVersion = Integer.parseInt(matcher.group(3)); + int userVersion = Integer.parseInt(matcher.group(4)); + + return new Descriptor(directory, timestamp, generation, journalVersion, userVersion); + } + + Descriptor withIncrementedGeneration() + { + return new Descriptor(directory, timestamp, generation + 1, journalVersion, userVersion); + } + + File fileFor(Component component) + { + return new File(directory, formatFileName(component)); + } + + File tmpFileFor(Component component) + { + return new File(directory, formatFileName(component) + '.' + TMP_SUFFIX); + } + + static boolean isTmpFile(File file) + { + return TMP_FILE_PATTERN.matcher(file.name()).matches(); + } + + private String formatFileName(Component component) + { + return format("%s%d%s%d%s%d%s%d.%s", + PREFIX, timestamp, + SEPARATOR, generation, + SEPARATOR, journalVersion, + SEPARATOR, userVersion, + component.extension); Review Comment: `String.format` is rather expensive, think it would be best to use `StringBuilder` in this case ########## src/java/org/apache/cassandra/service/accord/AccordJournal.java: ########## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.accord; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Executor; +import java.util.zip.Checksum; + +import com.google.common.primitives.Ints; + +import accord.local.Node.Id; +import accord.local.PreLoadContext; +import accord.messages.Accept; +import accord.messages.Apply; +import accord.messages.Commit; +import accord.messages.MessageType; +import accord.messages.PreAccept; +import accord.messages.TxnRequest; +import accord.primitives.*; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.Journal; +import org.apache.cassandra.journal.KeySupport; +import org.apache.cassandra.journal.Params; +import org.apache.cassandra.journal.ValueSerializer; +import org.apache.cassandra.service.accord.serializers.AcceptSerializers; +import org.apache.cassandra.service.accord.serializers.ApplySerializers; +import org.apache.cassandra.service.accord.serializers.CommitSerializers; +import org.apache.cassandra.service.accord.serializers.EnumSerializer; +import org.apache.cassandra.service.accord.serializers.PreacceptSerializers; + +import static org.apache.cassandra.db.TypeSizes.BYTE_SIZE; +import static org.apache.cassandra.db.TypeSizes.INT_SIZE; +import static org.apache.cassandra.db.TypeSizes.LONG_SIZE; +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; +import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong; + +/* + * TODO: expose more journal params via Config + */ +class AccordJournal +{ + private static final Set<Integer> SENTINEL_HOSTS = Collections.singleton(0); + + final File directory; + final Journal<Key, TxnRequest<?>> journal; + + AccordJournal() + { + directory = new File(DatabaseDescriptor.getAccordJournalDirectory()); + journal = new Journal<>("AccordJournal", directory, Params.DEFAULT, Key.SUPPORT, MESSAGE_SERIALIZER); + } + + AccordJournal start() + { + journal.start(); + return this; + } + + void shutdown() + { + journal.shutdown(); + } + + boolean mustAppend(PreLoadContext context) + { + return context instanceof TxnRequest && Type.mustAppend((TxnRequest<?>) context); + } + + void append(int storeId, PreLoadContext context, Executor executor, Runnable onDurable) + { + append(storeId, (TxnRequest<?>) context, executor, onDurable); + } + + void append(int storeId, TxnRequest<?> message, Executor executor, Runnable onDurable) + { + Key key = new Key(message.txnId, Type.fromMsgType(message.type()), storeId); + journal.asyncWrite(key, message, SENTINEL_HOSTS, executor, onDurable); Review Comment: `org.apache.cassandra.journal.Metadata` whole existence is to track hosts, but our requests don't contain a host and we currently use a fixed host at the write side; what is this host tracking supposed to be for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

