http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java index 4a12045..6176eeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java @@ -33,10 +33,12 @@ import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.internal.util.typedef.internal.SB; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; + /** * Data pages IO. */ -public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { +public abstract class AbstractDataPageIO<T extends Storable> extends PageIO implements CompactablePageIO { /** */ private static final int SHOW_ITEM = 0b0001; @@ -228,7 +230,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { * @param pageAddr Page address. * @return Free space. */ - private int getRealFreeSpace(long pageAddr) { + public int getRealFreeSpace(long pageAddr) { return PageUtils.getShort(pageAddr, FREE_SPACE_OFF); } @@ -822,9 +824,10 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { * @param pageAddr Page address. * @param payload Payload. * @param pageSize Page size. + * @return Item ID. * @throws IgniteCheckedException If failed. */ - public void addRow( + public int addRow( long pageAddr, byte[] payload, int pageSize @@ -840,7 +843,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { writeRowData(pageAddr, dataOff, payload); - addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); + return addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); } /** @@ -1106,6 +1109,62 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { return directCnt; // Previous directCnt will be our itemId. } + /** {@inheritDoc} */ + @Override public void compactPage(ByteBuffer page, ByteBuffer out, int pageSize) { + // TODO May we compactDataEntries in-place and then copy compacted data to out? + copyPage(page, out, pageSize); + + long pageAddr = bufferAddress(out); + + int freeSpace = getRealFreeSpace(pageAddr); + + if (freeSpace == 0) + return; // No garbage: nothing to compact here. + + int directCnt = getDirectCount(pageAddr); + + if (directCnt != 0) { + int firstOff = getFirstEntryOffset(pageAddr); + + if (firstOff - freeSpace != getHeaderSizeWithItems(pageAddr, directCnt)) { + firstOff = compactDataEntries(pageAddr, directCnt, pageSize); + setFirstEntryOffset(pageAddr, firstOff, pageSize); + } + + // Move all the data entries from page end to the page header to close the gap. + moveBytes(pageAddr, firstOff, pageSize - firstOff, -freeSpace, pageSize); + } + + out.limit(pageSize - freeSpace); // Here we have only meaningful data of this page. + } + + /** {@inheritDoc} */ + @Override public void restorePage(ByteBuffer page, int pageSize) { + assert page.isDirect(); + assert page.position() == 0; + assert page.limit() <= pageSize; + + long pageAddr = bufferAddress(page); + + int freeSpace = getRealFreeSpace(pageAddr); + + if (freeSpace != 0) { + int firstOff = getFirstEntryOffset(pageAddr); + int cnt = pageSize - firstOff; + + if (cnt != 0) { + int off = page.limit() - cnt; + + assert off > PageIO.COMMON_HEADER_END: off; + assert cnt > 0 : cnt; + + moveBytes(pageAddr, off, cnt, freeSpace, pageSize); + } + } + + page.limit(pageSize); + } + /** * @param pageAddr Page address. * @param directCnt Direct items count. @@ -1203,7 +1262,16 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { entriesSize += entrySize; } - return pageSize - ITEMS_OFF - entriesSize - (directCnt + getIndirectCount(pageAddr)) * ITEM_SIZE; + return pageSize - entriesSize - getHeaderSizeWithItems(pageAddr, directCnt); + } + + /** + * @param pageAddr Page address. + * @param directCnt Direct items count. + * @return Size of the page header including all items. + */ + private int getHeaderSizeWithItems(long pageAddr, int directCnt) { + return ITEMS_OFF + (directCnt + getIndirectCount(pageAddr)) * ITEM_SIZE; } /** @@ -1214,6 +1282,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO { * @param pageSize Page size. */ private void moveBytes(long addr, int off, int cnt, int step, int pageSize) { + assert cnt >= 0: cnt; assert step != 0 : step; assert off + step >= 0; assert off + step + cnt <= pageSize : "[off=" + off + ", step=" + step + ", cnt=" + cnt +
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java index 349e877..4c656eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java @@ -17,16 +17,18 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.io; +import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.util.GridStringBuilder; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.lang.IgniteInClosure; /** * Abstract IO routines for B+Tree pages. */ -public abstract class BPlusIO<L> extends PageIO { +public abstract class BPlusIO<L> extends PageIO implements CompactablePageIO { /** */ private static final int CNT_OFF = COMMON_HEADER_END; @@ -412,4 +414,32 @@ public abstract class BPlusIO<L> extends PageIO { .a(",\n\tremoveId=").appendHex(getRemoveId(addr)) .a("\n]"); } + + /** + * @param pageAddr Page address. + * @return Offset after the last item. + */ + public int getItemsEnd(long pageAddr) { + int cnt = getCount(pageAddr); + return offset(cnt); + } + + /** {@inheritDoc} */ + @Override public void compactPage(ByteBuffer page, ByteBuffer out, int pageSize) { + copyPage(page, out, pageSize); + + long pageAddr = GridUnsafe.bufferAddress(out); + + // Just drop all the extra garbage at the end. + out.limit(getItemsEnd(pageAddr)); + } + + /** {@inheritDoc} */ + @Override public void restorePage(ByteBuffer compactPage, int pageSize) { + assert compactPage.isDirect(); + assert compactPage.position() == 0; + assert compactPage.limit() <= pageSize; + + compactPage.limit(pageSize); // Just add garbage to the end. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java new file mode 100644 index 0000000..775a1f8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.tree.io; + +import java.nio.ByteBuffer; + +/** + * Page IO that supports compaction. + */ +public interface CompactablePageIO { + /** + * Compacts page contents to the output buffer. + * Implementation must not change contents, position and limit of the original page buffer. + * + * @param page Page buffer. + * @param out Output buffer. + * @param pageSize Page size. + */ + void compactPage(ByteBuffer page, ByteBuffer out, int pageSize); + + /** + * Restores the original page in place. + * + * @param compactPage Compact page. + * @param pageSize Page size. + */ + void restorePage(ByteBuffer compactPage, int pageSize); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java index 49eed88..e58aad9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.io; +import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.util.typedef.internal.S; + /** * */ @@ -61,4 +64,17 @@ public class DataPagePayload { public long nextLink() { return nextLink; } + + /** + * @param pageAddr Page address. + * @return Payload bytes. + */ + public byte[] getBytes(long pageAddr) { + return PageUtils.getBytes(pageAddr, off, payloadSize); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataPagePayload.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java index ee61e25..85a1e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java @@ -83,6 +83,9 @@ import org.apache.ignite.internal.util.GridStringBuilder; */ public abstract class PageIO { /** */ + private static PageIO testIO; + + /** */ private static BPlusInnerIO<?> innerTestIO; /** */ @@ -131,16 +134,19 @@ public abstract class PageIO { public static final int ROTATED_ID_PART_OFF = PAGE_ID_OFF + 8; /** */ - private static final int RESERVED_BYTE_OFF = ROTATED_ID_PART_OFF + 1; + private static final int COMPRESSION_TYPE_OFF = ROTATED_ID_PART_OFF + 1; + + /** */ + private static final int COMPRESSED_SIZE_OFF = COMPRESSION_TYPE_OFF + 1; /** */ - private static final int RESERVED_SHORT_OFF = RESERVED_BYTE_OFF + 1; + private static final int COMPACTED_SIZE_OFF = COMPRESSED_SIZE_OFF + 2; /** */ - private static final int RESERVED_INT_OFF = RESERVED_SHORT_OFF + 2; + private static final int RESERVED_SHORT_OFF = COMPACTED_SIZE_OFF + 2; /** */ - private static final int RESERVED_2_OFF = RESERVED_INT_OFF + 4; + private static final int RESERVED_2_OFF = RESERVED_SHORT_OFF + 2; /** */ private static final int RESERVED_3_OFF = RESERVED_2_OFF + 8; @@ -382,6 +388,54 @@ public abstract class PageIO { } /** + * @param page Page buffer. + * @param compressType Compression type. + */ + public static void setCompressionType(ByteBuffer page, byte compressType) { + page.put(COMPRESSION_TYPE_OFF, compressType); + } + + /** + * @param page Page buffer. + * @return Compression type. + */ + public static byte getCompressionType(ByteBuffer page) { + return page.get(COMPRESSION_TYPE_OFF); + } + + /** + * @param page Page buffer. + * @param compressedSize Compressed size. + */ + public static void setCompressedSize(ByteBuffer page, short compressedSize) { + page.putShort(COMPRESSED_SIZE_OFF, compressedSize); + } + + /** + * @param page Page buffer. + * @return Compressed size. + */ + public static short getCompressedSize(ByteBuffer page) { + return page.getShort(COMPRESSED_SIZE_OFF); + } + + /** + * @param page Page buffer. + * @param compactedSize Compacted size. + */ + public static void setCompactedSize(ByteBuffer page, short compactedSize) { + page.putShort(COMPACTED_SIZE_OFF, compactedSize); + } + + /** + * @param page Page buffer. + * @return Compacted size. + */ + public static short getCompactedSize(ByteBuffer page) { + return page.getShort(COMPACTED_SIZE_OFF); + } + + /** * @param pageAddr Page address. * @return Checksum. */ @@ -487,6 +541,15 @@ public abstract class PageIO { } /** + * Registers IO for testing. + * + * @param io Page IO. + */ + public static void registerTest(PageIO io) { + testIO = io; + } + + /** * @return Type. */ public final int getType() { @@ -513,7 +576,8 @@ public abstract class PageIO { setPageId(pageAddr, pageId); setCrc(pageAddr, 0); - PageUtils.putLong(pageAddr, ROTATED_ID_PART_OFF, 0L); // 1 + reserved(1+2+4) + // rotated(1) + compress_type(1) + compressed_size(2) + compacted_size(2) + reserved(2) + PageUtils.putLong(pageAddr, ROTATED_ID_PART_OFF, 0L); PageUtils.putLong(pageAddr, RESERVED_2_OFF, 0L); PageUtils.putLong(pageAddr, RESERVED_3_OFF, 0L); } @@ -536,6 +600,15 @@ public abstract class PageIO { } /** + * @param page Page. + * @return Page IO. + * @throws IgniteCheckedException If failed. + */ + public static <Q extends PageIO> Q getPageIO(ByteBuffer page) throws IgniteCheckedException { + return getPageIO(getType(page), getVersion(page)); + } + + /** * @param type IO Type. * @param ver IO Version. * @return Page IO. @@ -572,6 +645,11 @@ public abstract class PageIO { return (Q)SimpleDataPageIO.VERSIONS.forVersion(ver); default: + if (testIO != null) { + if (testIO.type == type && testIO.ver == ver) + return (Q)testIO; + } + return (Q)getBPlusIO(type, ver); } } @@ -715,6 +793,21 @@ public abstract class PageIO { protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException; /** + * @param page Page. + * @param out Output buffer. + * @param pageSize Page size. + */ + protected final void copyPage(ByteBuffer page, ByteBuffer out, int pageSize) { + assert out.position() == 0; + assert pageSize <= out.remaining(); + assert pageSize == page.remaining(); + + page.mark(); + out.put(page).flip(); + page.reset(); + } + + /** * @param addr Address. */ public static String printPage(long addr, int pageSize) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index e70a027..03bac2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; @@ -671,4 +672,9 @@ public class StandaloneGridKernalContext implements GridKernalContext { @NotNull @Override public Iterator<GridComponent> iterator() { return null; } + + /** {@inheritDoc} */ + @Override public CompressionProcessor compress() { + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java new file mode 100644 index 0000000..8b917b3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteComponentType; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; + +/** + * Compression processor. + * + * @see IgniteComponentType#COMPRESSION + */ +public class CompressionProcessor extends GridProcessorAdapter { + /** */ + public static final int LZ4_MIN_LEVEL = 0; + + /** */ + public static final int LZ4_MAX_LEVEL = 17; + + /** */ + public static final int LZ4_DEFAULT_LEVEL = 0; + + /** */ + public static final int ZSTD_MIN_LEVEL = -131072; + + /** */ + public static final int ZSTD_MAX_LEVEL = 22; + + /** */ + public static final int ZSTD_DEFAULT_LEVEL = 3; + + /** */ + protected static final byte UNCOMPRESSED_PAGE = 0; + + /** */ + protected static final byte COMPACTED_PAGE = 1; + + /** */ + protected static final byte ZSTD_COMPRESSED_PAGE = 2; + + /** */ + protected static final byte LZ4_COMPRESSED_PAGE = 3; + + /** */ + protected static final byte SNAPPY_COMPRESSED_PAGE = 4; + + /** + * @param ctx Kernal context. + */ + public CompressionProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** + * @param compression Compression algorithm. + * @return Default compression level. + */ + public static int getDefaultCompressionLevel(DiskPageCompression compression) { + switch (compression) { + case ZSTD: + return ZSTD_DEFAULT_LEVEL; + + case LZ4: + return LZ4_DEFAULT_LEVEL; + + case SNAPPY: + case SKIP_GARBAGE: + return 0; + } + + throw new IllegalArgumentException("Compression: " + compression); + } + + /** + * @param compressLevel Compression level. + * @param compression Compression algorithm. + * @return Compression level. + */ + public static int checkCompressionLevelBounds(int compressLevel, DiskPageCompression compression) { + switch (compression) { + case ZSTD: + checkCompressionLevelBounds(compressLevel, ZSTD_MIN_LEVEL, ZSTD_MAX_LEVEL); + break; + + case LZ4: + checkCompressionLevelBounds(compressLevel, LZ4_MIN_LEVEL, LZ4_MAX_LEVEL); + break; + + default: + throw new IllegalArgumentException("Compression level for " + compression + " is not supported."); + } + + return compressLevel; + } + + /** + * @param compressLevel Compression level. + * @param min Min level. + * @param max Max level. + */ + private static void checkCompressionLevelBounds(int compressLevel, int min, int max) { + if (compressLevel < min || compressLevel > max) { + throw new IllegalArgumentException("Compression level for LZ4 must be between " + min + + " and " + max + "."); + } + } + + /** + * @throws IgniteCheckedException Always. + */ + private static <T> T fail() throws IgniteCheckedException { + throw new IgniteCheckedException("Make sure that ignite-compress module is in classpath."); + } + + /** + * @param storagePath Storage path. + * @param pageSize Page size. + * @throws IgniteCheckedException If compression is not supported. + */ + public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException { + fail(); + } + + /** + * @param page Page buffer. + * @param pageSize Page size. + * @param storeBlockSize Store block size. + * @param compression Compression algorithm. + * @param compressLevel Compression level. + * @return Possibly compressed buffer. + * @throws IgniteCheckedException If failed. + */ + public ByteBuffer compressPage( + ByteBuffer page, + int pageSize, + int storeBlockSize, + DiskPageCompression compression, + int compressLevel + ) throws IgniteCheckedException { + return fail(); + } + + /** + * @param page Possibly compressed page buffer. + * @param pageSize Page size. + * @throws IgniteCheckedException If failed. + */ + public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { + if (PageIO.getCompressionType(page) != UNCOMPRESSED_PAGE) + fail(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java new file mode 100644 index 0000000..1877640 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.nio.file.Path; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.IgniteComponentType; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Native file system API. + */ +public final class FileSystemUtils { + /** */ + private static final String NATIVE_FS_LINUX_CLASS = + "org.apache.ignite.internal.processors.compress.NativeFileSystemLinux"; + + /** */ + private static final NativeFileSystem fs; + + /** */ + private static volatile Throwable err; + + /** */ + static { + NativeFileSystem x = null; + + try { + if (IgniteComponentType.COMPRESSION.inClassPath()) { + if (U.isLinux()) + x = U.newInstance(NATIVE_FS_LINUX_CLASS); + } + } + catch (Throwable e) { + err = e; + } + + fs = x; + } + + /** + */ + public static void checkSupported() { + Throwable e = err; + + if (e != null || fs == null) + throw new IgniteException("Native file system API is not supported on " + U.osString(), e); + } + + /** + * @param path File system path. + * @return File system block size or negative value if not supported. + */ + public static int getFileSystemBlockSize(Path path) { + return fs == null ? -1 : fs.getFileSystemBlockSize(path); + } + + /** + * @param fd Native file descriptor. + * @return File system block size or negative value if not supported. + */ + public static int getFileSystemBlockSize(int fd) { + return fs == null ? -1 : fs.getFileSystemBlockSize(fd); + } + + /** + * !!! Use with caution. May produce unexpected results. + * + * Known to work correctly on Linux EXT4 and Btrfs, + * while on XSF it returns meaningful result only after + * file reopening. + * + * @param fd Native file descriptor. + * @return Approximate system dependent size of the sparse file or negative + * value if not supported. + */ + public static long getSparseFileSize(int fd) { + return fs == null ? -1 : fs.getSparseFileSize(fd); + } + + /** + * @param fd Native file descriptor. + * @param off Offset of the hole. + * @param len Length of the hole. + * @param fsBlockSize File system block size. + * @return Actual punched hole size. + */ + public static long punchHole(int fd, long off, long len, int fsBlockSize) { + assert off >= 0; + assert len > 0; + + checkSupported(); + + if (len < fsBlockSize) + return 0; + + // TODO maybe optimize for power of 2 + if (off % fsBlockSize != 0) { + long end = off + len; + off = (off / fsBlockSize + 1) * fsBlockSize; + len = end - off; + + if (len <= 0) + return 0; + } + + len = len / fsBlockSize * fsBlockSize; + + if (len > 0) + fs.punchHole(fd, off, len); + + return len; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java new file mode 100644 index 0000000..673d1bc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.nio.file.Path; + +/** + * Native file system API. + */ +public interface NativeFileSystem { + /** + * @param path Path. + * @return File system block size in bytes. + */ + int getFileSystemBlockSize(Path path); + + /** + * @param fd Native file descriptor. + * @return File system block size in bytes. + */ + int getFileSystemBlockSize(int fd); + + /** + * @param fd Native file descriptor. + * @param off Offset of the hole. + * @param len Length of the hole. + */ + void punchHole(int fd, long off, long len); + + /** + * @param fd Native file descriptor. + * @return Approximate system dependent size of the sparse file. + */ + long getSparseFileSize(int fd); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java index 2cc0ae3..6b8e2b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java @@ -1339,6 +1339,7 @@ public abstract class GridUnsafe { * @return Buffer memory address. */ public static long bufferAddress(ByteBuffer buf) { + assert buf.isDirect(); return UNSAFE.getLong(buf, DIRECT_BUF_ADDR_OFF); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 2d6b584..946378d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -3933,15 +3933,23 @@ public abstract class IgniteUtils { * @return Hex string. */ public static String byteArray2HexString(byte[] arr) { - SB sb = new SB(arr.length << 1); + StringBuilder sb = new StringBuilder(arr.length << 1); for (byte b : arr) - sb.a(Integer.toHexString(MASK & b >>> 4)).a(Integer.toHexString(MASK & b)); + addByteAsHex(sb, b); return sb.toString().toUpperCase(); } /** + * @param sb String builder. + * @param b Byte to add in hexadecimal format. + */ + private static void addByteAsHex(StringBuilder sb, byte b) { + sb.append(Integer.toHexString(MASK & b >>> 4)).append(Integer.toHexString(MASK & b)); + } + + /** * Checks for containment of the value in the array. * Both array cells and value may be {@code null}. Two {@code null}s are considered equal. * @@ -10552,12 +10560,10 @@ public abstract class IgniteUtils { * @return hex representation of memory region */ public static String toHexString(long addr, int len) { - assert (len & 0b111) == 0 && len > 0; - StringBuilder sb = new StringBuilder(len * 2); - for (int i = 0; i < len; i += 8) - sb.append(U.hexLong(GridUnsafe.getLong(addr + i))); + for (int i = 0; i < len; i++) // Can not use getLong because on little-endian it produces bs. + addByteAsHex(sb, GridUnsafe.getByte(addr + i)); return sb.toString(); } @@ -10568,12 +10574,10 @@ public abstract class IgniteUtils { * @return hex representation of memory region */ public static String toHexString(ByteBuffer buf) { - assert (buf.capacity() & 0b111) == 0; - StringBuilder sb = new StringBuilder(buf.capacity() * 2); - for (int i = 0; i < buf.capacity(); i += 8) - sb.append(U.hexLong(buf.getLong(i))); + for (int i = 0; i < buf.capacity(); i++) + addByteAsHex(sb, buf.get(i)); // Can not use getLong because on little-endian it produces bs. return sb.toString(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java index 2612a41..cf8aa76 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java @@ -179,4 +179,16 @@ public interface CacheGroupMetricsMXBean { */ @MXBeanDescription("Total size of memory allocated for group, in bytes.") public long getTotalAllocatedSize(); + + /** + * Storage space allocated for group, in bytes. + */ + @MXBeanDescription("Storage space allocated for group, in bytes.") + public long getStorageSize(); + + /** + * Storage space allocated for group adjusted for possible sparsity, in bytes. + */ + @MXBeanDescription("Storage space allocated for group adjusted for possible sparsity, in bytes.") + public long getSparseStorageSize(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java index 2069099..4689f15 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java @@ -174,4 +174,12 @@ public interface DataStorageMetricsMXBean extends DataStorageMetrics { "Number of subintervals to set." ) public void subIntervals(int subInts); + + /** {@inheritDoc} */ + @MXBeanDescription("Storage space allocated, in bytes.") + @Override long getStorageSize(); + + /** {@inheritDoc} */ + @MXBeanDescription("Storage space allocated adjusted for possible sparsity, in bytes.") + @Override long getSparseStorageSize(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java index 36d8b41..ed6eb86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java @@ -120,7 +120,6 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest } DataStorageConfiguration memCfg = new DataStorageConfiguration(); - memCfg.setPageSize(4 * 1024); memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setMaxSize(150L * 1024 * 1024) .setPersistenceEnabled(persistenceEnabled())); @@ -1330,6 +1329,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest ccfgs[4] = cacheConfiguration(CACHE_NAME_PREFIX + 4, TRANSACTIONAL); ccfgs[4].setDataRegionName(NO_PERSISTENCE_REGION); + ccfgs[4].setDiskPageCompression(null); return ccfgs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java index 1b36ac1..c402ad7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java @@ -17,25 +17,40 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.GridStringBuilder; /** * Dummy PageIO implementation. For test purposes only. */ -public class DummyPageIO extends PageIO { +public class DummyPageIO extends PageIO implements CompactablePageIO { /** */ public DummyPageIO() { super(2 * Short.MAX_VALUE, 1); + + PageIO.registerTest(this); } /** {@inheritDoc} */ - @Override - protected void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException { + @Override protected void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException { sb.a("DummyPageIO [\n"); sb.a("addr=").a(addr).a(", "); sb.a("pageSize=").a(addr); sb.a("\n]"); } + + /** {@inheritDoc} */ + @Override public void compactPage(ByteBuffer page, ByteBuffer out, int pageSize) { + copyPage(page, out, pageSize); + } + + /** {@inheritDoc} */ + @Override public void restorePage(ByteBuffer p, int pageSize) { + assert p.isDirect(); + assert p.position() == 0; + assert p.limit() == pageSize; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java index 4db1de9..0f5aef9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java @@ -55,6 +55,9 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest { /** */ private static final String GROUP1 = "grp1"; + /** */ + private static final String NO_PERSISTENCE = "no-persistence"; + /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { cleanPersistenceDir(); @@ -73,19 +76,20 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest { cfg.setConsistentId(gridName); + long maxRegionSize = 20L * 1024 * 1024; + DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setMaxSize(10L * 1024 * 1024) + .setMaxSize(maxRegionSize) .setPersistenceEnabled(true) .setMetricsEnabled(true) .setName("dflt-plc")) .setDataRegionConfigurations(new DataRegionConfiguration() - .setMaxSize(10L * 1024 * 1024) + .setMaxSize(maxRegionSize) .setPersistenceEnabled(false) .setMetricsEnabled(true) - .setName("no-persistence")) + .setName(NO_PERSISTENCE)) .setWalMode(WALMode.LOG_ONLY) - .setPageSize(4 * 1024) .setMetricsEnabled(true); cfg.setDataStorageConfiguration(memCfg); @@ -95,7 +99,7 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); cfg.setCacheConfiguration(cacheConfiguration(GROUP1, "cache", PARTITIONED, ATOMIC, 1, null), - cacheConfiguration(null, "cache-np", PARTITIONED, ATOMIC, 1, "no-persistence")); + cacheConfiguration(null, "cache-np", PARTITIONED, ATOMIC, 1, NO_PERSISTENCE)); return cfg; } @@ -135,6 +139,9 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest { ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setDataRegionName(dataRegName); + if (NO_PERSISTENCE.equals(dataRegName)) + ccfg.setDiskPageCompression(null); + return ccfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java index 713d4cc..be3ed0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java @@ -45,8 +45,7 @@ public class IgnitePdsDynamicCacheTest extends IgniteDbDynamicCacheSelfTest { DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration().setMaxSize(200L * 1024 * 1024).setPersistenceEnabled(true)) - .setWalMode(WALMode.LOG_ONLY) - .setPageSize(4 * 1024); + .setWalMode(WALMode.LOG_ONLY); cfg.setDataStorageConfiguration(memCfg); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java index ecc6e02..d6d5c4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java @@ -105,6 +105,7 @@ public class IgnitePdsExchangeDuringCheckpointTest extends GridCommonAbstractTes CacheConfiguration ccfgNp = new CacheConfiguration("nonPersistentCache"); ccfgNp.setDataRegionName(NO_PERSISTENCE_REGION); + ccfgNp.setDiskPageCompression(null); ccfg.setAffinity(new RendezvousAffinityFunction(false, 4096)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java index 353bc50..d907239 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java @@ -21,16 +21,20 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION; + /** * */ @@ -112,6 +116,10 @@ public class IgnitePdsPageSizesTest extends GridCommonAbstractTest { * @throws Exception if failed. */ private void checkPageSize(int pageSize) throws Exception { + if (pageSize <= 4 * 1024 && + IgniteSystemProperties.getEnum(DiskPageCompression.class, IGNITE_DEFAULT_DISK_PAGE_COMPRESSION) != null) + return; // Small pages do not work with compression. + this.pageSize = pageSize; IgniteEx ignite = startGrid(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java index 48b60d4..56426f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java @@ -566,6 +566,21 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr } /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return delegate.getFileSystemBlockSize(); + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return delegate.getSparseSize(); + } + + /** {@inheritDoc} */ + @Override public int punchHole(long position, int len) { + return delegate.punchHole(position, len); + } + + /** {@inheritDoc} */ @Override public long position() throws IOException { return delegate.position(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java index b8cb047..4787143 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java @@ -57,14 +57,15 @@ public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest { ccfgs = null; } + long regionMaxSize = 20L * 1024 * 1024; + DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( - new DataRegionConfiguration().setMaxSize(10L * 1024 * 1024).setPersistenceEnabled(true)) - .setPageSize(4 * 1024) + new DataRegionConfiguration().setMaxSize(regionMaxSize).setPersistenceEnabled(true)) .setWalMode(WALMode.LOG_ONLY); memCfg.setDataRegionConfigurations(new DataRegionConfiguration() - .setMaxSize(10L * 1024 * 1024) + .setMaxSize(regionMaxSize) .setName(NO_PERSISTENCE_REGION) .setPersistenceEnabled(false)); @@ -210,6 +211,7 @@ public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest { ccfgs[2] = cacheConfiguration("c3"); ccfgs[2].setDataRegionName(NO_PERSISTENCE_REGION); + ccfgs[2].setDiskPageCompression(null); return ccfgs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java index c05f65c..0154c14 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java @@ -60,7 +60,7 @@ public class IgnitePdsDataRegionMetricsTest extends GridCommonAbstractTest { private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ - private static final long INIT_REGION_SIZE = 10 << 20; + private static final long INIT_REGION_SIZE = 20 << 20; /** */ private static final long MAX_REGION_SIZE = INIT_REGION_SIZE * 10; @@ -312,7 +312,13 @@ public class IgnitePdsDataRegionMetricsTest extends GridCommonAbstractTest { FilePageStore store = (FilePageStore)pageStoreMgr.getStore(CU.cacheId(cacheName), partId(file)); - totalPersistenceSize += path.toFile().length() - store.headerSize(); + int pageSize = store.getPageSize(); + long storeSize = path.toFile().length() - store.headerSize(); + + if (storeSize % pageSize != 0) + storeSize = (storeSize / pageSize + 1) * pageSize; // Adjust for possible page compression. + + totalPersistenceSize += storeSize; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java index 5593e44..cfbb8d8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java @@ -61,8 +61,6 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { DataStorageConfiguration dbCfg = new DataStorageConfiguration(); - dbCfg.setPageSize(4 * 1024); - cfg.setDataStorageConfiguration(dbCfg); dbCfg.setWalSegmentSize(1024 * 1024) http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java index 8baa1c3..c9174ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java @@ -68,7 +68,6 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); DataStorageConfiguration dsCfg = new DataStorageConfiguration(); - dsCfg.setPageSize(1024); //smaller page to reduce overhead to short values dsCfg.setDefaultDataRegionConfiguration( new DataRegionConfiguration() .setPersistenceEnabled(true) http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java index 29e113c..70d003d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java @@ -38,7 +38,7 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** Client mode. */ - private boolean set2kPageSize = true; + private boolean set16kPageSize = true; /** Entries count. */ public static final int ENTRIES_COUNT = 300; @@ -55,8 +55,10 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac DataStorageConfiguration memCfg = new DataStorageConfiguration(); - if (set2kPageSize) - memCfg.setPageSize(2048); + if (set16kPageSize) + memCfg.setPageSize(16 * 1024); + else + memCfg.setPageSize(0); // Enforce default. DataRegionConfiguration memPlcCfg = new DataRegionConfiguration(); memPlcCfg.setMaxSize(100L * 1000 * 1000); @@ -64,7 +66,7 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac memPlcCfg.setPersistenceEnabled(true); memCfg.setDefaultDataRegionConfiguration(memPlcCfg); - memCfg.setCheckpointFrequency(3_000); + memCfg.setCheckpointFrequency(500); cfg.setDataStorageConfiguration(memCfg); @@ -75,6 +77,9 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32)); + if (!set16kPageSize) + ccfg1.setDiskPageCompression(null); + cfg.setCacheConfiguration(ccfg1); cfg.setConsistentId(gridName); @@ -99,7 +104,7 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac /** * @throws Exception If failed. */ - public void testStartFrom2kDefaultStore() throws Exception { + public void testStartFrom16kDefaultStore() throws Exception { startGrids(2); Ignite ig = ignite(0); @@ -113,11 +118,11 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac for (int i = 0; i < ENTRIES_COUNT; i++) cache.put(i, i); - Thread.sleep(5_000); // Await for checkpoint to happen. + Thread.sleep(1500); // Await for checkpoint to happen. stopAllGrids(); - set2kPageSize = false; + set16kPageSize = false; startGrids(2); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java index 5bf7e7f..14525f0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java @@ -42,8 +42,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; - /** * */ @@ -83,7 +81,7 @@ public class WalCompactionTest extends GridCommonAbstractTest { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setName("cache"); + ccfg.setName(CACHE_NAME); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); @@ -139,7 +137,9 @@ public class WalCompactionTest extends GridCommonAbstractTest { IgniteEx ig = (IgniteEx)startGrids(3); ig.cluster().active(true); - IgniteCache<Integer, byte[]> cache = ig.cache("cache"); + IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME); + + final int pageSize = ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize(); for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total. final byte[] val = new byte[20000]; @@ -150,9 +150,9 @@ public class WalCompactionTest extends GridCommonAbstractTest { } // Spam WAL to move all data records to compressible WAL zone. - for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++) { - ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE], - DFLT_PAGE_SIZE)); + for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) { + ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize], + pageSize)); } // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head. @@ -262,7 +262,7 @@ public class WalCompactionTest extends GridCommonAbstractTest { IgniteEx ig = startGrid(0); ig.cluster().active(true); - IgniteCache<Integer, byte[]> cache = ig.cache("cache"); + IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME); for (int i = 0; i < 2500; i++) { // At least 50MB of raw data in total. final byte[] val = new byte[20000]; @@ -325,7 +325,9 @@ public class WalCompactionTest extends GridCommonAbstractTest { IgniteEx ig = (IgniteEx)startGrids(3); ig.cluster().active(true); - IgniteCache<Integer, byte[]> cache = ig.cache("cache"); + IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME); + + final int pageSize = ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize(); for (int i = 0; i < 100; i++) { final byte[] val = new byte[20000]; @@ -364,9 +366,9 @@ public class WalCompactionTest extends GridCommonAbstractTest { } // Spam WAL to move all data records to compressible WAL zone. - for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++) { - ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE], - DFLT_PAGE_SIZE)); + for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) { + ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize], + pageSize)); } // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java index 300b752..3a99236 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java @@ -55,7 +55,6 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT dbCfg.setWalMode(walMode()); dbCfg.setWalSegmentSize(512 * 1024); dbCfg.setCheckpointFrequency(60 * 1000);//too high value for turn off frequency checkpoint. - dbCfg.setPageSize(4 * 1024); dbCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setMaxSize(100 * 1024 * 1024) .setPersistenceEnabled(true)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java index 9a23502..14013b1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java @@ -73,8 +73,6 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest { if (isLargePage()) dbCfg.setPageSize(16 * 1024); - else - dbCfg.setPageSize(4 * 1024); dbCfg.setWalMode(WALMode.LOG_ONLY); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 8ac72f7..02857ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; +import org.apache.ignite.internal.processors.cache.CacheCompressionManager; import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager; import org.apache.ignite.internal.processors.cache.CacheType; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; @@ -90,6 +91,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { true, true, false, + new CacheCompressionManager(), new GridCacheEventManager(), new CacheOsStoreManager(null, new CacheConfiguration()), new GridCacheEvictionManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/pom.xml ---------------------------------------------------------------------- diff --git a/modules/direct-io/pom.xml b/modules/direct-io/pom.xml index e460e67..a5e2841 100644 --- a/modules/direct-io/pom.xml +++ b/modules/direct-io/pom.xml @@ -59,6 +59,21 @@ </dependency> <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-compress</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-compress</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> @@ -75,7 +90,7 @@ <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> - <version>4.5.0</version> + <version>${jna.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java index 98fd99b..a37dcdb 100644 --- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.internal.processors.compress.FileSystemUtils; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.NotNull; @@ -46,19 +47,22 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { /** Negative value for file offset: read/write starting from current file position */ private static final int FILE_POS_USE_CURRENT = -1; - /** File system & linux block size. Minimal amount of data can be written using DirectIO. */ - private final int fsBlockSize; + /** Minimal amount of data can be written using DirectIO. */ + private final int ioBlockSize; - /** Durable memory Page size. Can have greater value than {@link #fsBlockSize}. */ + /** Durable memory Page size. Can have greater value than {@link #ioBlockSize}. */ private final int pageSize; + /** File system block size. */ + private final int fsBlockSize; + /** File. */ private final File file; /** Logger. */ private final IgniteLogger log; - /** Thread local with buffers with capacity = one page {@link #pageSize} and aligned using {@link #fsBlockSize}. */ + /** Thread local with buffers with capacity = one page {@link #pageSize} and aligned using {@link #ioBlockSize}. */ private ThreadLocal<ByteBuffer> tlbOnePageAligned; /** Managed aligned buffers. Used to check if buffer is applicable for direct IO our data should be copied. */ @@ -79,18 +83,18 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { /** * Creates Direct File IO. * - * @param fsBlockSize FS/OS block size. + * @param ioBlockSize FS/OS block size. * @param pageSize Durable memory Page size. * @param file File to open. * @param modes Open options (flags). * @param tlbOnePageAligned Thread local with buffers with capacity = one page {@code pageSize} and aligned using - * {@code fsBlockSize}. + * {@code ioBlockSize}. * @param managedAlignedBuffers Managed aligned buffers map, used to check if buffer is known. * @param log Logger. * @throws IOException if file open failed. */ AlignedBuffersDirectFileIO( - int fsBlockSize, + int ioBlockSize, int pageSize, File file, OpenOption[] modes, @@ -99,7 +103,7 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { IgniteLogger log) throws IOException { this.log = log; - this.fsBlockSize = fsBlockSize; + this.ioBlockSize = ioBlockSize; this.pageSize = pageSize; this.file = file; this.tlbOnePageAligned = tlbOnePageAligned; @@ -126,6 +130,7 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { "(probably incompatible file system selected, for example, tmpfs): " + msg); this.fd = fd; + fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd); return; } @@ -135,6 +140,7 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { } this.fd = fd; + fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd); } /** @@ -174,6 +180,21 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { } /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return fsBlockSize; + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return FileSystemUtils.getSparseFileSize(fd); + } + + /** {@inheritDoc} */ + @Override public int punchHole(long position, int len) { + return (int)FileSystemUtils.punchHole(fd, position, len, fsBlockSize); + } + + /** {@inheritDoc} */ @Override public long position() throws IOException { long position = IgniteNativeIoLib.lseek(fdCheckOpened(), 0, IgniteNativeIoLib.SEEK_CUR); @@ -202,11 +223,22 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { @Override public int read(ByteBuffer destBuf, long filePosition) throws IOException { int size = checkSizeIsPadded(destBuf.remaining()); - if (isKnownAligned(destBuf)) - return readIntoAlignedBuffer(destBuf, filePosition); + return isKnownAligned(destBuf) ? + readIntoAlignedBuffer(destBuf, filePosition) : + readIntoUnalignedBuffer(destBuf, filePosition, size); + } + /** + * @param destBuf Destination aligned byte buffer. + * @param filePosition File position. + * @param size Buffer size to write, should be divisible by {@link #ioBlockSize}. + * @return Number of read bytes, possibly zero, or <tt>-1</tt> if the + * given position is greater than or equal to the file's current size. + * @throws IOException If failed. + */ + private int readIntoUnalignedBuffer(ByteBuffer destBuf, long filePosition, int size) throws IOException { boolean useTlb = size == pageSize; - ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(fsBlockSize, size); + ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(ioBlockSize, size); try { assert alignedBuf.position() == 0: "Temporary aligned buffer is in incorrect state: position is set incorrectly"; @@ -241,29 +273,46 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { /** {@inheritDoc} */ @Override public int write(ByteBuffer srcBuf, long filePosition) throws IOException { - int size = checkSizeIsPadded(srcBuf.remaining()); + return isKnownAligned(srcBuf) ? + writeFromAlignedBuffer(srcBuf, filePosition) : + writeFromUnalignedBuffer(srcBuf, filePosition); + } - if (isKnownAligned(srcBuf)) - return writeFromAlignedBuffer(srcBuf, filePosition); + /** + * @param srcBuf buffer to check if it is known buffer. + * @param filePosition File position. + * @return Number of written bytes. + * @throws IOException If failed. + */ + private int writeFromUnalignedBuffer(ByteBuffer srcBuf, long filePosition) throws IOException { + int size = srcBuf.remaining(); - boolean useTlb = size == pageSize; - ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(fsBlockSize, size); + boolean useTlb = size <= pageSize; + ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(ioBlockSize, size); try { assert alignedBuf.position() == 0 : "Temporary aligned buffer is in incorrect state: position is set incorrectly"; - assert alignedBuf.limit() == size : "Temporary aligned buffer is in incorrect state: limit is set incorrectly"; + assert alignedBuf.limit() >= size : "Temporary aligned buffer is in incorrect state: limit is set incorrectly"; int initPos = srcBuf.position(); alignedBuf.put(srcBuf); alignedBuf.flip(); - srcBuf.position(initPos); // will update later from write results + int len = alignedBuf.remaining(); + + // Compressed buffer of wrong size can be passed here. + if (len % ioBlockSize != 0) + alignBufferLimit(alignedBuf); int written = writeFromAlignedBuffer(alignedBuf, filePosition); - if (written > 0) - srcBuf.position(initPos + written); + // Actual written length can be greater than the original buffer, + // since we artificially expanded it to have correctly aligned size. + if (written > len) + written = len; + + srcBuf.position(initPos + written); return written; } @@ -276,6 +325,17 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { } /** + * @param buf Byte buffer to align. + */ + private void alignBufferLimit(ByteBuffer buf) { + int len = buf.remaining(); + + int alignedLen = (len / ioBlockSize + 1) * ioBlockSize; + + buf.limit(buf.limit() + alignedLen - len); + } + + /** * Checks if we can run fast path: we got well known buffer is already aligned. * * @param srcBuf buffer to check if it is known buffer. @@ -290,16 +350,16 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { /** * Check if size is appropriate for aligned/direct IO. * - * @param size buffer size to write, should be divisible by {@link #fsBlockSize}. + * @param size buffer size to write, should be divisible by {@link #ioBlockSize}. * @return size from parameter. * @throws IOException if provided size can't be written using direct IO. */ private int checkSizeIsPadded(int size) throws IOException { - if (size % fsBlockSize != 0) { + if (size % ioBlockSize != 0) { throw new IOException( - String.format("Unable to apply DirectIO for read/write buffer [%d] bytes on file system " + + String.format("Unable to apply DirectIO for read/write buffer [%d] bytes on " + "block size [%d]. Consider setting %s.setPageSize(%d) or disable Direct IO.", - size, fsBlockSize, DataStorageConfiguration.class.getSimpleName(), fsBlockSize)); + size, ioBlockSize, DataStorageConfiguration.class.getSimpleName(), ioBlockSize)); } return size; @@ -446,9 +506,9 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO { if (pos > buf.capacity()) throw new BufferOverflowException(); - if ((alignedPointer + pos) % fsBlockSize != 0) { + if ((alignedPointer + pos) % ioBlockSize != 0) { U.warn(log, String.format("IO Buffer Pointer [%d] and/or offset [%d] seems to be not aligned " + - "for FS block size [%d]. Direct IO may fail.", alignedPointer, buf.position(), fsBlockSize)); + "for IO block size [%d]. Direct IO may fail.", alignedPointer, buf.position(), ioBlockSize)); } return new Pointer(alignedPointer + pos); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java index 85a3a02..5d919fe 100644 --- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java @@ -49,7 +49,7 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory { private final FileIOFactory backupFactory; /** File system/os block size, negative value if library init was failed. */ - private final int fsBlockSize; + private final int ioBlockSize; /** Use backup factory, {@code true} if direct IO setup failed. */ private boolean useBackupFactory; @@ -81,22 +81,22 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory { this.backupFactory = backupFactory; useBackupFactory = true; - fsBlockSize = IgniteNativeIoLib.getFsBlockSize(storePath.getAbsolutePath(), log); + ioBlockSize = IgniteNativeIoLib.getDirectIOBlockSize(storePath.getAbsolutePath(), log); if(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIRECT_IO_ENABLED, true)) { if (log.isInfoEnabled()) - log.info("Direct IO is explicitly disabled by system property"); + log.info("Direct IO is explicitly disabled by system property."); return; } - if (fsBlockSize > 0) { - int blkSize = fsBlockSize; + if (ioBlockSize > 0) { + int blkSize = ioBlockSize; if (pageSize % blkSize != 0) { U.warn(log, String.format("Unable to setup Direct IO for Ignite [pageSize=%d bytes;" + " file system block size=%d]. For speeding up Ignite consider setting %s.setPageSize(%d)." + - " Direct IO is disabled", + " Direct IO is disabled.", pageSize, blkSize, DataStorageConfiguration.class.getSimpleName(), blkSize)); } else { @@ -133,7 +133,7 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory { assert !useBackupFactory : "Direct IO is disabled, aligned managed buffer creation is disabled now"; assert managedAlignedBuffers != null : "Direct buffers not available"; - ByteBuffer allocate = AlignedBuffers.allocate(fsBlockSize, size).order(ByteOrder.nativeOrder()); + ByteBuffer allocate = AlignedBuffers.allocate(ioBlockSize, size).order(ByteOrder.nativeOrder()); managedAlignedBuffers.put(GridUnsafe.bufferAddress(allocate), Thread.currentThread()); @@ -145,7 +145,7 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory { if (useBackupFactory) return backupFactory.create(file, modes); - return new AlignedBuffersDirectFileIO(fsBlockSize, pageSize, file, modes, tlbOnePageAligned, managedAlignedBuffers, log); + return new AlignedBuffersDirectFileIO(ioBlockSize, pageSize, file, modes, tlbOnePageAligned, managedAlignedBuffers, log); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java index 65ef8d7..2ab4325 100644 --- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java @@ -190,7 +190,7 @@ public class IgniteNativeIoLib { * <li>and <tt>-1</tt> if failed to determine block size.</li> * <li>and <tt>-1</tt> if JNA is not available or init failed.</li> </ul> */ - public static int getFsBlockSize(final String storageDir, final IgniteLogger log) { + public static int getDirectIOBlockSize(final String storageDir, final IgniteLogger log) { if (ex != null) { U.warn(log, "Failed to initialize O_DIRECT support at current OS: " + ex.getMessage(), ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java new file mode 100644 index 0000000..89b963a --- /dev/null +++ b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.internal.processors.cache.persistence.file.AlignedBuffersDirectFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationTest; + +/** + */ +public class DiskPageCompressionIntegrationDirectIOTest extends DiskPageCompressionIntegrationTest { + /** {@inheritDoc} */ + @Override protected void checkFileIOFactory(FileIOFactory f) { + assertTrue(f instanceof AlignedBuffersDirectFileIOFactory); + } +}