blambov commented on code in PR #2894: URL: https://github.com/apache/cassandra/pull/2894#discussion_r1404093467
########## conf/cassandra.yaml: ########## @@ -606,6 +606,16 @@ commitlog_segment_size: 32MiB # parameters: # - +# Set the disk access mode for writing commitlog segments. The allowed values are: +# - auto: version dependent optimal setting +# - legacy: the default mode as used in Cassandra 4.x and earlier (standard I/O when the commitlog is either +# compressed or encrypted or mmap otherwise) +# - mmap: use memory mapped I/O - available only when the commitlog is neither compressed nor encrypted +# - direct: use direct I/O - available only when the commitlog is neither compressed nor encrypted +# - standard: use standard I/O - available only when the commitlog is compressed or encrypted +# The default setting is legacy when the storage compatibility is set to 4 or auto otherwise. +commitlog_disk_access_mode: auto Review Comment: This should be commented out in the committed version (cassandra_latest.yaml in CASSANDRA-18753 will switch to auto). ########## src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import com.sun.nio.file.ExtendedOpenOption; +import net.openhft.chronicle.core.util.ThrowingFunction; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.SimpleCachedBufferPool; +import org.apache.cassandra.utils.ByteBufferUtil; +import sun.nio.ch.DirectBuffer; + +/* + * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and align + * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size (4K). + * Java-11 forces minimum page size to be written to disk with Direct-IO. + */ +public class DirectIOSegment extends CommitLogSegment +{ + private final int fsBlockSize; + private final int fsBlockQuotientMask; + private final int fsBlockRemainderMask; + + // Needed to track number of bytes written to disk in multiple of page size. + long lastWritten = 0; + + /** + * Constructs a new segment file. + */ + DirectIOSegment(AbstractCommitLogSegmentManager manager, ThrowingFunction<Path, FileChannel, IOException> channelFactory, int fsBlockSize) + { + super(manager, channelFactory); + + assert Integer.highestOneBit(fsBlockSize) == fsBlockSize : "fsBlockSize must be a power of 2"; + + // mark the initial sync marker as uninitialised + int firstSync = buffer.position(); + buffer.putInt(firstSync + 0, 0); + buffer.putInt(firstSync + 4, 0); + + this.fsBlockSize = fsBlockSize; + this.fsBlockRemainderMask = fsBlockSize - 1; + this.fsBlockQuotientMask = ~fsBlockRemainderMask; + } + + @Override + void writeLogHeader() + { + super.writeLogHeader(); + // Testing shows writing initial bytes takes some time for Direct I/O. During peak load, + // it is better to make "COMMIT-LOG-ALLOCATOR" thread to write these few bytes of each + // file and this helps syncer thread to speedup the flush activity. + flush(0, lastSyncedOffset); + } + + @Override + void write(int startMarker, int nextMarker) + { + // if there's room in the discard section to write an empty header, + // zero out the next sync marker so replayer can cleanly exit + if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE) + { + buffer.putInt(nextMarker, 0); + buffer.putInt(nextMarker + 4, 0); + } + + // write previous sync marker to point to next sync marker + // we don't chain the crcs here to ensure this method is idempotent if it fails + writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker); + } + + @Override + protected void flush(int startMarker, int nextMarker) + { + try + { + // lastSyncedOffset is synced to disk. Align lastSyncedOffset to start of its block + // and nextMarker to end of its block to avoid write errors. + int flushPosition = lastSyncedOffset; + ByteBuffer duplicate = buffer.duplicate(); + + // Aligned file position if not aligned to start of a block. + if ((flushPosition & fsBlockRemainderMask) != 0) + { + flushPosition = flushPosition & fsBlockQuotientMask; + channel.position(flushPosition); + } + duplicate.position(flushPosition); + + int flushLimit = nextMarker; + + // Align last byte to end of block. + if ((flushLimit & fsBlockRemainderMask) != 0) + flushLimit = (flushLimit + fsBlockSize) & fsBlockQuotientMask; Review Comment: Nit: Remove the `if` and do this as `flushLimit = (flushLimit + fsBlockSize - 1) & fsBlockQuotientMask;` or `flushLimit = (flushLimit | (fsBlockSize - 1)) + 1`. `fsBlockRemainderMask` can be removed; `fsBlockQuotientMask` is `-fsBlockSize`. FWIW calculating these is faster and safer than fetching them, though possibly not as readable. At some later time it would be best to extend `PageAware` to offer methods that apply to arbitrary page sizes. ########## src/java/org/apache/cassandra/config/DatabaseDescriptor.java: ########## @@ -512,18 +515,22 @@ else if (conf.commitlog_sync_period.toMilliseconds() != 0) } /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */ - if (conf.disk_access_mode == Config.DiskAccessMode.auto) + if (conf.disk_access_mode == DiskAccessMode.auto || conf.disk_access_mode == DiskAccessMode.legacy) Review Comment: `auto` should map to `mmap_index_only` here (the default changed recently). ########## src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java: ########## @@ -764,4 +763,18 @@ public CommitLogPosition getCommitLogPosition() return new CommitLogPosition(segment.id, buffer.limit()); } } + + protected abstract static class Builder<S extends CommitLogSegment> + { + protected final AbstractCommitLogSegmentManager segmentManager; + + public Builder(AbstractCommitLogSegmentManager segmentManager) + { + this.segmentManager = segmentManager; + } + + public abstract S build(); Review Comment: The generification here is not helpful and only clutters the code or causes raw type warnings. Each subclass can return its concrete type here, and a caller can only benefit from the generic if it already knows the concrete type, in which case it is calling a method that does return the concrete type. ########## NEWS.txt: ########## @@ -187,6 +187,9 @@ New features - Added snitch for Microsoft Azure of name AzureSnitch (CASSANDRA-18646) - legacy command line options from cassandra-stress were removed - `-mode` option in cassandra-stress has `native` and `cql3` as defaults and they do not need to be specified + - Allow to write the commitlog using direct I/O. Direct I/O is a new feature that minimizes cache effects by using Review Comment: I would add "and memory-mapping overhead" after "cache effects" as this is a much bigger problem with the mmap option. ########## src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java: ########## @@ -105,4 +112,27 @@ protected void internalClose() FileUtils.clean(buffer); super.internalClose(); } + + protected static class MemoryMappedSegmentBuilder extends CommitLogSegment.Builder<MemoryMappedSegment> + { + public MemoryMappedSegmentBuilder(AbstractCommitLogSegmentManager segmentManager) + { + super(segmentManager); + } + + @Override + public MemoryMappedSegment build() + { + return new MemoryMappedSegment(segmentManager, + path -> FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE)); + } + + @Override + public SimpleCachedBufferPool createBufferPool() + { + return new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), + DatabaseDescriptor.getCommitLogSegmentSize(), + BufferType.ON_HEAP); Review Comment: On-heap here is giving a strong "something's wrong" signal. Could we not create a buffer pool i.e. return null in this case? ########## src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java: ########## @@ -150,4 +149,28 @@ public long onDiskSize() { return lastWrittenPos; } + + protected static class EncryptedSegmentBuilder extends CommitLogSegment.Builder<EncryptedSegment> + { + + public EncryptedSegmentBuilder(AbstractCommitLogSegmentManager segmentManager) + { + super(segmentManager); + } + + @Override + public EncryptedSegment build() + { + return new EncryptedSegment(segmentManager, + path -> FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE)); + } + + @Override + public SimpleCachedBufferPool createBufferPool() + { + return new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), + DatabaseDescriptor.getCommitLogSegmentSize(), + BufferType.ON_HEAP); Review Comment: The comment about the type of buffer ``` // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption, // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs ``` needs to be preserved here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

