jacek-lewandowski commented on code in PR #2777:
URL: https://github.com/apache/cassandra/pull/2777#discussion_r1389030301
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -2649,6 +2688,24 @@ public static void setCommitLogSegmentSize(int
sizeMebibytes)
conf.commitlog_segment_size = new
DataStorageSpec.IntMebibytesBound(sizeMebibytes);
}
+ /**
+ * Return commitlog disk access mode.
+ */
+ public static Config.DiskAccessMode getCommitLogDiskAccessMode()
+ {
+ return conf.commitlog_disk_access_mode;
+ }
+
+ public static boolean isCommitLogUsingDirectIO()
Review Comment:
Since we use enums, I think this method is redundant
##########
conf/cassandra.yaml:
##########
@@ -589,6 +589,16 @@ commitlog_segment_size: 32MiB
# parameters:
# -
+# CommitLog segments can use legacy (default), direct (new) and 'auto'
(optimized
Review Comment:
The comment looks good for release notes however I'd prefer to rephrase it
for yaml doc. I think that the essential information which should be there are:
- this property applies to writing the commit log
- the allowed values are: ... (with meaning)
- the default value is: ...
I think we should include the current explanation in the NEWS.txt
##########
src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java:
##########
@@ -197,6 +206,12 @@ void writeLogHeader()
lastSyncedOffset = lastMarkerOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
headerWritten = true;
+
+ // Testing shows writing initial bytes takes some time for Direct I/O.
During peak load,
+ // it is better to make "COMMIT-LOG-ALLOCATOR" thread to write these
few bytes of each
+ // file and this helps syncer thread to speedup the flush activity.
+ if (CommitLog.instance.configuration.isDirectIOEnabled())
Review Comment:
`writeLogHeader` methods should be rather overridden in `DirectIOSegment`
and call this statement there
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
Review Comment:
should be private I think
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
+
+ // May get previously used buffer and zero it out to now. Direct I/O
writes additional bytes during flush
+ // operation.
+ LongBuffer arrayBuffer = original.asLongBuffer();
+ for(int i = 0 ; i < arrayBuffer.limit() ; i++)
+ arrayBuffer.put(i, 0);
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(minimumDirectIOAlignement);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
+
+ assert alignedBuffer.alignmentOffset(0, minimumDirectIOAlignement) ==
0 : String.format("Index 0 should be aligned to %d page size.",
minimumDirectIOAlignement);
+ assert alignedBuffer.alignmentOffset(alignedBuffer.limit(),
minimumDirectIOAlignement) == 0 : String.format("Limit should be aligned to %d
page size", minimumDirectIOAlignement);
+
+ return alignedBuffer;
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+
+ try
+ {
+ // lastSyncedOffset is synced to disk. Align lastSyncedOffset to
start of 4K page
+ // and nextMarker to end of 4K page to avoid write errors.
+ int filePosition = lastSyncedOffset;
+ ByteBuffer duplicate = buffer.duplicate();
+
+ // Aligned file position if not aligned to start of 4K page.
+ if (filePosition % minimumDirectIOAlignement != 0)
+ {
+ filePosition = filePosition & ~(minimumDirectIOAlignement -1);
Review Comment:
`~(minimumDirectIOAlignement -1)` - if you stored the block size in a final
field or `DatabaseDescriptor` you could store a mask in final field as well
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
+
+ // May get previously used buffer and zero it out to now. Direct I/O
writes additional bytes during flush
+ // operation.
+ LongBuffer arrayBuffer = original.asLongBuffer();
+ for(int i = 0 ; i < arrayBuffer.limit() ; i++)
+ arrayBuffer.put(i, 0);
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(minimumDirectIOAlignement);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
+
+ assert alignedBuffer.alignmentOffset(0, minimumDirectIOAlignement) ==
0 : String.format("Index 0 should be aligned to %d page size.",
minimumDirectIOAlignement);
+ assert alignedBuffer.alignmentOffset(alignedBuffer.limit(),
minimumDirectIOAlignement) == 0 : String.format("Limit should be aligned to %d
page size", minimumDirectIOAlignement);
+
+ return alignedBuffer;
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+
+ try
+ {
+ // lastSyncedOffset is synced to disk. Align lastSyncedOffset to
start of 4K page
+ // and nextMarker to end of 4K page to avoid write errors.
+ int filePosition = lastSyncedOffset;
Review Comment:
would be good to named it `flushPosition`
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -511,6 +511,41 @@ else if (conf.commitlog_sync_period.toMilliseconds() != 0)
logger.debug("Syncing log with a period of {}",
conf.commitlog_sync_period.toString());
}
+ // Only mmap & Direct-I/O access modes are supported. Standard access
mode is preassumed if Compressed or Encrypted segment is used.
+ // This ensures default behavior is unchanged.
+ if (conf.commitlog_disk_access_mode != Config.DiskAccessMode.auto
+ && conf.commitlog_disk_access_mode != Config.DiskAccessMode.legacy
+ && conf.commitlog_disk_access_mode !=
Config.DiskAccessMode.direct)
+ {
+ throw new ConfigurationException(String.format("Commitlog access
mode '%s' is not supported. Supported modes are '%s', '%s' and '%s'",
+
conf.commitlog_disk_access_mode, Config.DiskAccessMode.auto,
+
Config.DiskAccessMode.legacy, Config.DiskAccessMode.direct));
+ }
+ else
+ {
+ Config.DiskAccessMode current_disk_access_mode =
conf.commitlog_disk_access_mode;
+
+ if (current_disk_access_mode == Config.DiskAccessMode.auto)
+ {
+ current_disk_access_mode = conf.disk_optimization_strategy ==
Config.DiskOptimizationStrategy.ssd ? Config.DiskAccessMode.direct
+
: Config.DiskAccessMode.legacy;
+ // This is necessary for other functions to get the new mode
determined through auto.
+ conf.commitlog_disk_access_mode = current_disk_access_mode;
+ logger.info("DiskAccessMode 'auto' determined to be '{}' for
Commitlog disk", current_disk_access_mode);
+ }
+
+ if (getCommitLogCompression() != null || (getEncryptionContext()
!= null && getEncryptionContext().isEnabled()))
+ {
+ if (current_disk_access_mode == Config.DiskAccessMode.direct)
+ {
+ // Compressed and Encrypted segments are not yet supported
with Direct-I/O feature.
Review Comment:
It looks like we will get there when `auto` is set and the commit log is
compressed/encrypted - shouldn't `auto` mean no configuration errors?
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
+
+ // May get previously used buffer and zero it out to now. Direct I/O
writes additional bytes during flush
+ // operation.
+ LongBuffer arrayBuffer = original.asLongBuffer();
+ for(int i = 0 ; i < arrayBuffer.limit() ; i++)
+ arrayBuffer.put(i, 0);
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(minimumDirectIOAlignement);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
+
+ assert alignedBuffer.alignmentOffset(0, minimumDirectIOAlignement) ==
0 : String.format("Index 0 should be aligned to %d page size.",
minimumDirectIOAlignement);
+ assert alignedBuffer.alignmentOffset(alignedBuffer.limit(),
minimumDirectIOAlignement) == 0 : String.format("Limit should be aligned to %d
page size", minimumDirectIOAlignement);
+
+ return alignedBuffer;
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+
Review Comment:
nit: blank line to be removed
##########
test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java:
##########
@@ -190,10 +194,11 @@ private void testLog() throws IOException,
InterruptedException
}
private void testLog(CommitLog commitLog) throws IOException,
InterruptedException {
- System.out.format("\nTesting commit log size %.0fmb, compressor: %s,
encryption enabled: %b, sync %s%s%s\n",
+ System.out.format("\nTesting commit log size %.0fmb, compressor: %s,
encryption enabled: %b, direct I/O enabled: %b, sync %s%s%s\n",
Review Comment:
maybe mention the disk policy explicitly
##########
src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java:
##########
@@ -111,12 +111,19 @@ void start()
{
// For encrypted segments we want to keep the compression buffers
on-heap as we need those bytes for encryption,
// and we want to avoid copying from off-heap (compression buffer) to
on-heap encryption APIs
- BufferType bufferType = commitLog.configuration.useEncryption() ||
!commitLog.configuration.useCompression()
- ? BufferType.ON_HEAP
- :
commitLog.configuration.getCompressor().preferredBufferType();
+ BufferType bufferType = commitLog.configuration.isDirectIOEnabled()
+ ? BufferType.OFF_HEAP
+ : commitLog.configuration.useEncryption() ||
!commitLog.configuration.useCompression()
+ ? BufferType.ON_HEAP
+ :
commitLog.configuration.getCompressor().preferredBufferType();
+
+ // Identify minimum block size used for Direct-I/O. allocateDirect
does not return from aligned space.
+ // To avoid write errors additional block size is requested and next
aligned it minimum block size.
Review Comment:
Could you rephrase it? I found it difficult to understand - maybe something
like:
_The direct buffer must be aligned with the file system block size. We
cannot enforce that during allocation, but we can get an aligned slice from the
allocated buffer. The buffer must be oversized by the alignment unit to make it
possible._
##########
src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java:
##########
@@ -83,6 +85,9 @@ public enum CDCState
replayLimitId = idBase = Math.max(currentTimeMillis(), maxId + 1);
}
+ // Minimum alignement and block size required by direct I/O feature.
+ public static int minimumDirectIOAlignement = 0;
Review Comment:
this should either land in `DirectIOSegment` and should not be static or it
should be computed as part of `DatabaseDescriptor` initialization and stored
there; Also, I'd rename it to `commitLogFileBlockSize` (or something like that)
##########
src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java:
##########
@@ -714,6 +729,29 @@ public CDCState setCDCState(CDCState newState)
}
}
+ /**
+ * Direct requires minimum alignement and block size to be used for read
and writes. Identify it now.
+ */
+ static public int getDirectIOMinimumAlignement(String storageDirectory,
String fileName)
Review Comment:
`fileName` is unused.
Also I think you should rather do
`Files.createTempFile(Paths.get(storageDirectory), ...)` and try-with-resources
##########
src/java/org/apache/cassandra/db/commitlog/CommitLog.java:
##########
@@ -624,11 +632,13 @@ public static final class Configuration
*/
private EncryptionContext encryptionContext;
- public Configuration(ParameterizedClass compressorClass,
EncryptionContext encryptionContext)
+ public Configuration(ParameterizedClass compressorClass,
EncryptionContext encryptionContext,
+ Config.DiskAccessMode diskAccessMode)
{
this.compressorClass = compressorClass;
this.compressor = compressorClass != null ?
CompressionParams.createCompressor(compressorClass) : null;
this.encryptionContext = encryptionContext;
+ this.diskAccessMode = diskAccessMode;
Review Comment:
I think we should have a precondition to validate the disk access mode here
as well
##########
conf/cassandra.yaml:
##########
@@ -589,6 +589,16 @@ commitlog_segment_size: 32MiB
# parameters:
# -
+# CommitLog segments can use legacy (default), direct (new) and 'auto'
(optimized
+# configuration) I/O modes to flush the data to disk. Direct I/O is a new
feature
+# that minimizes cache effects by using user space buffers. This helps in
+# transferring data from/to disk at high speed. Java enabled support for
+# Direct-I/O feature from version 10 # onwards.
+# Refer: https://bugs.openjdk.org/browse/JDK-8164900
+# The same is not enabled yet for compressed and encrypted based segments and
+# for replay (reading) commitlog files.
+# commitlog_disk_access_mode: legacy
Review Comment:
nit, please at least state that it is for writing, like
`commitlog_write_disk_acccess_mode`; I'd really prefer to have hierarchical
structure as it is the direction we are slowing moving towards:
```
commitlog:
write_disk_access_policy: legacy
```
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
Review Comment:
assert `original == null`
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
+
+ // May get previously used buffer and zero it out to now. Direct I/O
writes additional bytes during flush
+ // operation.
+ LongBuffer arrayBuffer = original.asLongBuffer();
+ for(int i = 0 ; i < arrayBuffer.limit() ; i++)
+ arrayBuffer.put(i, 0);
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(minimumDirectIOAlignement);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
+
+ assert alignedBuffer.alignmentOffset(0, minimumDirectIOAlignement) ==
0 : String.format("Index 0 should be aligned to %d page size.",
minimumDirectIOAlignement);
+ assert alignedBuffer.alignmentOffset(alignedBuffer.limit(),
minimumDirectIOAlignement) == 0 : String.format("Limit should be aligned to %d
page size", minimumDirectIOAlignement);
+
+ return alignedBuffer;
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+
+ try
+ {
+ // lastSyncedOffset is synced to disk. Align lastSyncedOffset to
start of 4K page
+ // and nextMarker to end of 4K page to avoid write errors.
+ int filePosition = lastSyncedOffset;
+ ByteBuffer duplicate = buffer.duplicate();
+
+ // Aligned file position if not aligned to start of 4K page.
+ if (filePosition % minimumDirectIOAlignement != 0)
+ {
+ filePosition = filePosition & ~(minimumDirectIOAlignement -1);
+ channel.position(filePosition);
+ }
+ duplicate.position(filePosition);
+
+ int flushSizeInBytes = nextMarker;
+
+ // Align last byte to end of 4K page.
+ if (flushSizeInBytes % minimumDirectIOAlignement != 0)
+ flushSizeInBytes = (flushSizeInBytes +
minimumDirectIOAlignement) & ~(minimumDirectIOAlignement -1);
+
+ duplicate.limit(flushSizeInBytes);
+
+ channel.write(duplicate);
+
+ // Direct I/O always writes flushes in block size and writes more
than the flush size.
+ // File size on disk will always multiple of page size and taking
this into account
+ // helps testcases to pass.
+ if (flushSizeInBytes > lastWritten)
+ {
+ manager.addSize(flushSizeInBytes - lastWritten);
Review Comment:
Why don't call `manager.addSize(segment size)` on buffer creation just like
it is done for mapped segment? We know the size upfront, don't we?
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
+
+ // May get previously used buffer and zero it out to now. Direct I/O
writes additional bytes during flush
+ // operation.
+ LongBuffer arrayBuffer = original.asLongBuffer();
+ for(int i = 0 ; i < arrayBuffer.limit() ; i++)
+ arrayBuffer.put(i, 0);
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(minimumDirectIOAlignement);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
+
+ assert alignedBuffer.alignmentOffset(0, minimumDirectIOAlignement) ==
0 : String.format("Index 0 should be aligned to %d page size.",
minimumDirectIOAlignement);
+ assert alignedBuffer.alignmentOffset(alignedBuffer.limit(),
minimumDirectIOAlignement) == 0 : String.format("Limit should be aligned to %d
page size", minimumDirectIOAlignement);
+
+ return alignedBuffer;
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+
+ try
+ {
+ // lastSyncedOffset is synced to disk. Align lastSyncedOffset to
start of 4K page
+ // and nextMarker to end of 4K page to avoid write errors.
+ int filePosition = lastSyncedOffset;
+ ByteBuffer duplicate = buffer.duplicate();
+
+ // Aligned file position if not aligned to start of 4K page.
+ if (filePosition % minimumDirectIOAlignement != 0)
+ {
+ filePosition = filePosition & ~(minimumDirectIOAlignement -1);
+ channel.position(filePosition);
+ }
+ duplicate.position(filePosition);
+
+ int flushSizeInBytes = nextMarker;
Review Comment:
name `flushSizeInBytes` is misleading, it is the `flushLimit` right?
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
+
+ // May get previously used buffer and zero it out to now. Direct I/O
writes additional bytes during flush
+ // operation.
+ LongBuffer arrayBuffer = original.asLongBuffer();
+ for(int i = 0 ; i < arrayBuffer.limit() ; i++)
+ arrayBuffer.put(i, 0);
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(minimumDirectIOAlignement);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
+
+ assert alignedBuffer.alignmentOffset(0, minimumDirectIOAlignement) ==
0 : String.format("Index 0 should be aligned to %d page size.",
minimumDirectIOAlignement);
+ assert alignedBuffer.alignmentOffset(alignedBuffer.limit(),
minimumDirectIOAlignement) == 0 : String.format("Limit should be aligned to %d
page size", minimumDirectIOAlignement);
+
+ return alignedBuffer;
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+
+ try
+ {
+ // lastSyncedOffset is synced to disk. Align lastSyncedOffset to
start of 4K page
+ // and nextMarker to end of 4K page to avoid write errors.
+ int filePosition = lastSyncedOffset;
+ ByteBuffer duplicate = buffer.duplicate();
+
+ // Aligned file position if not aligned to start of 4K page.
+ if (filePosition % minimumDirectIOAlignement != 0)
+ {
+ filePosition = filePosition & ~(minimumDirectIOAlignement -1);
+ channel.position(filePosition);
+ }
+ duplicate.position(filePosition);
+
+ int flushSizeInBytes = nextMarker;
+
+ // Align last byte to end of 4K page.
+ if (flushSizeInBytes % minimumDirectIOAlignement != 0)
+ flushSizeInBytes = (flushSizeInBytes +
minimumDirectIOAlignement) & ~(minimumDirectIOAlignement -1);
+
+ duplicate.limit(flushSizeInBytes);
+
+ channel.write(duplicate);
+
+ // Direct I/O always writes flushes in block size and writes more
than the flush size.
+ // File size on disk will always multiple of page size and taking
this into account
+ // helps testcases to pass.
+ if (flushSizeInBytes > lastWritten)
Review Comment:
I'm not sure about this. It looks like `flushSizeInBytes` is always >
`lastWritten`, isn't it?
##########
src/java/org/apache/cassandra/db/commitlog/DirectIOSegment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+
+/*
+ * Direct-IO segment. Allocates ByteBuffer using ByteBuffer.allocateDirect and
align
+ * ByteBuffer.position, ByteBuffer.limit and FileChannel.position to page size
(4K).
+ * Java-11 forces minimum page size to be written to disk with Direct-IO.
+ */
+public class DirectIOSegment extends CommitLogSegment
+{
+ ByteBuffer original;
+
+ // Needed to track number of bytes written to disk in multiple of page
size.
+ long lastWritten = 0;
+
+ /**
+ * Constructs a new segment file.
+ *
+ * @param commitLog the commit log it will be used with.
+ */
+ DirectIOSegment(CommitLog commitLog, AbstractCommitLogSegmentManager
manager)
+ {
+ super(commitLog, manager);
+
+ // mark the initial sync marker as uninitialised
+ int firstSync = buffer.position();
+ buffer.putInt(firstSync + 0, 0);
+ buffer.putInt(firstSync + 4, 0);
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ int segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
+
+ original = manager.getBufferPool().createBuffer();
+
+ // May get previously used buffer and zero it out to now. Direct I/O
writes additional bytes during flush
+ // operation.
+ LongBuffer arrayBuffer = original.asLongBuffer();
+ for(int i = 0 ; i < arrayBuffer.limit() ; i++)
+ arrayBuffer.put(i, 0);
+
+ ByteBuffer alignedBuffer =
original.alignedSlice(minimumDirectIOAlignement);
+ assert alignedBuffer.limit() >= segmentSize :
String.format("Bytebuffer slicing failed to get required buffer size
(required=%d,current size=%d", segmentSize, alignedBuffer.limit());
+
+ assert alignedBuffer.alignmentOffset(0, minimumDirectIOAlignement) ==
0 : String.format("Index 0 should be aligned to %d page size.",
minimumDirectIOAlignement);
+ assert alignedBuffer.alignmentOffset(alignedBuffer.limit(),
minimumDirectIOAlignement) == 0 : String.format("Limit should be aligned to %d
page size", minimumDirectIOAlignement);
+
+ return alignedBuffer;
+ }
+
+ @Override
+ void write(int startMarker, int nextMarker)
+ {
+ // if there's room in the discard section to write an empty header,
+ // zero out the next sync marker so replayer can cleanly exit
+ if (nextMarker <= buffer.capacity() - SYNC_MARKER_SIZE)
+ {
+ buffer.putInt(nextMarker, 0);
+ buffer.putInt(nextMarker + 4, 0);
+ }
+
+ // write previous sync marker to point to next sync marker
+ // we don't chain the crcs here to ensure this method is idempotent if
it fails
+ writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+
+ try
+ {
+ // lastSyncedOffset is synced to disk. Align lastSyncedOffset to
start of 4K page
+ // and nextMarker to end of 4K page to avoid write errors.
Review Comment:
you mean pages of file block size rather than 4K right?
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -511,6 +511,41 @@ else if (conf.commitlog_sync_period.toMilliseconds() != 0)
logger.debug("Syncing log with a period of {}",
conf.commitlog_sync_period.toString());
}
+ // Only mmap & Direct-I/O access modes are supported. Standard access
mode is preassumed if Compressed or Encrypted segment is used.
+ // This ensures default behavior is unchanged.
+ if (conf.commitlog_disk_access_mode != Config.DiskAccessMode.auto
+ && conf.commitlog_disk_access_mode != Config.DiskAccessMode.legacy
+ && conf.commitlog_disk_access_mode !=
Config.DiskAccessMode.direct)
+ {
+ throw new ConfigurationException(String.format("Commitlog access
mode '%s' is not supported. Supported modes are '%s', '%s' and '%s'",
+
conf.commitlog_disk_access_mode, Config.DiskAccessMode.auto,
+
Config.DiskAccessMode.legacy, Config.DiskAccessMode.direct));
+ }
+ else
+ {
+ Config.DiskAccessMode current_disk_access_mode =
conf.commitlog_disk_access_mode;
+
+ if (current_disk_access_mode == Config.DiskAccessMode.auto)
+ {
+ current_disk_access_mode = conf.disk_optimization_strategy ==
Config.DiskOptimizationStrategy.ssd ? Config.DiskAccessMode.direct
+
: Config.DiskAccessMode.legacy;
+ // This is necessary for other functions to get the new mode
determined through auto.
+ conf.commitlog_disk_access_mode = current_disk_access_mode;
+ logger.info("DiskAccessMode 'auto' determined to be '{}' for
Commitlog disk", current_disk_access_mode);
+ }
+
+ if (getCommitLogCompression() != null || (getEncryptionContext()
!= null && getEncryptionContext().isEnabled()))
+ {
+ if (current_disk_access_mode == Config.DiskAccessMode.direct)
+ {
+ // Compressed and Encrypted segments are not yet supported
with Direct-I/O feature.
Review Comment:
tbh, I think the check should be as follows:
Allowed options when no compression/encryption is enabled:
- `legacy`, `auto`, `mmap`, `direct`, (I don't see why standard should not
be allowed, but let's leave it forbidden for now)
Allowed options when compression/encryption is enabled:
- `legacy`, `auto`, `standard`, (at least for now, in the future we can try
to use `mmap` if needed, support for reading compressed files with `mmap` has
been implemented as part of CASSANDRA-17056, at least for sstable files)
`auto` is resolved to:
- `direct` if ssd and no compression
- `legacy` otherwise
`legacy` is resolved to:
- `mmap` if no compression (right?)
- `standard` otherwise
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]