johnyangk commented on a change in pull request #223: [NEMO-388] Off-heap memory management (reuse ByteBuffer) URL: https://github.com/apache/incubator-nemo/pull/223#discussion_r307102927
########## File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryChunk.java ########## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.runtime.executor.data; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + + +/** + * This class represents chunk of memory residing in off-heap region + * managed by {@link MemoryPoolAssigner}, which is backed by {@link ByteBuffer}. + */ +public class MemoryChunk { + + @SuppressWarnings("restriction") + protected static final sun.misc.Unsafe UNSAFE = getUnsafe(); + @SuppressWarnings("restriction") + protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + private final ByteBuffer buffer; + private long address; + private final long addressLimit; + private boolean isFree; + private final int size; + private final boolean sequential; + + /** + * Creates a new memory chunk that represents the off-heap memory at the absolute address. + * This class can be created in two modes: sequential access mode or random access mode. + * Sequential access mode supports convenient sequential access of {@link ByteBuffer}. + * Random access mode supports random access and manipulation of the data in the {@code ByteBuffer} using UNSAFE. + * No automatic tracking of position, limit, capacity, etc. of {@code ByteBuffer} for random access mode. + * + * @param offHeapAddress the address of the off-heap memory, {@link ByteBuffer}, of this MemoryChunk + * @param buffer the off-heap memory of this MemoryChunk + * @param sequential whether this MemoryChunk is in sequential mode or not. + */ + MemoryChunk(final long offHeapAddress, final ByteBuffer buffer, final boolean sequential) { + if (offHeapAddress <= 0) { + throw new IllegalArgumentException("negative pointer or size"); + } + if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) { + throw new IllegalArgumentException("MemoryChunk initialized with too large address"); + } + this.buffer = buffer; + this.size = buffer.capacity(); + this.address = offHeapAddress; + this.addressLimit = this.address + this.size; + this.isFree = false; + this.sequential = sequential; + } + + /** + * Creates a new memory chunk that represents the off-heap memory at the absolute address. + * + * @param buffer the off-heap memory of this MemoryChunk + * @param sequential whether this MemoryChunk is in sequential mode or not. + */ + MemoryChunk(final ByteBuffer buffer, final boolean sequential) { + this(getAddress(buffer), buffer, sequential); + } + + /** + * Gets the {@link ByteBuffer} from this MemoryChunk. + * + * @return {@link ByteBuffer} + */ + public ByteBuffer getBuffer() { + return buffer; + } + + /** + * Gets the remaining number of bytes in the {@link ByteBuffer} of this MemoryChunk. + * This is supported for sequential MemoryChunk. + * + * @return the number of remaining bytes + * @throws IllegalAccessException if remaining() not supported by this MemoryChunk. + */ + public int remaining() throws IllegalAccessException { + if (sequential) { + return buffer.remaining(); + } else { + throw new IllegalAccessException("remaining() only allowed for sequential MemoryChunk"); + } + } + + /** + * Gets the current position of the {@link ByteBuffer} of this MemoryChunk. + * + * @return the position + * @throws IllegalAccessException if position() not supported by this MemoryChunk. + */ + public int position() throws IllegalAccessException { + if (sequential) { + return buffer.position(); + } else { + throw new IllegalAccessException("position() only allowed for sequential MemoryChunk"); + } + } + + /** + * Makes the duplicated instance of this MemoryChunk. + * + * @return the MemoryChunk with the same content of the caller instance + */ + public MemoryChunk duplicate() { + return new MemoryChunk(buffer.duplicate(), sequential); + } + + /** + * Frees this MemoryChunk. No further operation possible after calling this method. + */ + public void free() { + isFree = true; + } + + /** + * Reads the byte at the given index. + * + * @param index from which the byte will be read + * @return the byte at the given position + */ + @SuppressWarnings("restriction") + public final byte get(final int index) { + final long pos = address + index; + if (index >= 0 && pos < addressLimit) { + return UNSAFE.getByte(pos); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } else { + throw new IndexOutOfBoundsException(); + } + } + + /** + * Reads the byte at the current position of the {@link ByteBuffer}. + * + * @return the byte value + * @throws IllegalAccessException if called by random access mode MemoryChunk. + */ + public final byte get() throws IllegalAccessException { + if (!sequential) { + throw new IllegalAccessException("Not allowed for non-sequential MemoryChunk."); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } + return buffer.get(); + } + + /** + * Writes the given byte into this buffer at the given index. + * + * @param index The position at which the byte will be written. + * @param b The byte value to be written. + */ + @SuppressWarnings("restriction") + public final void put(final int index, final byte b) { + final long pos = address + index; + if (index >= 0 && pos < addressLimit) { + UNSAFE.putByte(pos, b); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } else { + throw new IndexOutOfBoundsException(); + } + } + + /** + * Writes the given byte into the current position of the {@link ByteBuffer}. + * + * @param b the byte value to be written. + * @throws IllegalAccessException if called by random access mode MemoryChunk. + */ + public final void put(final byte b) throws IllegalAccessException { + if (!sequential) { + throw new IllegalAccessException("Not allowed for non-sequential MemoryChunk."); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } + buffer.put(b); + } + + /** + * Copies the data of the MemoryChunk from the specified position to target byte array. + * + * @param index The position at which the first byte will be read. + * @param dst The memory into which the memory will be copied. + */ + public final void get(final int index, final byte[] dst) { + get(index, dst, 0, dst.length); + } + + /** + * Copies the data of the MemoryChunk from the current position of the {@link ByteBuffer} to target byte array. + * + * @param dst the target byte array to copy the data from MemoryChunk. + * @throws IllegalAccessException if called by random access mode MemoryChunk. + */ + public final void get(final byte[] dst) throws IllegalAccessException { + if (!sequential) { + throw new IllegalAccessException("Not allowed for non-sequential MemoryChunk."); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } + buffer.get(dst); + } + + /** + * Copies all the data from the source byte array into the MemoryChunk + * beginning at the specified position. + * + * @param index the position in MemoryChunk to start copying the data. + * @param src the source byte array that holds the data to copy. + */ + public final void put(final int index, final byte[] src) { + put(index, src, 0, src.length); + } + + /** + * Copies all the data from the source byte array into the MemoryChunk + * beginning at the current position of the {@link ByteBuffer}. + * + * @param src the source byte array that holds the data to copy. + * @throws IllegalAccessException if called by non-sequential MemoryChunk. + */ + public final void put(final byte[] src) throws IllegalAccessException { + if (!sequential) { + throw new IllegalAccessException("Not allowed for non-sequential MemoryChunk."); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } + buffer.put(src); + } + + /** + * Bulk get method using nk.the specified index in the MemoryChunk. + * + * @param index the index in the MemoryChunk to start copying the data. + * @param dst the target byte array to copy the data from MemoryChunk. + * @param offset the offset in the destination byte array. + * @param length the number of bytes to be copied. + */ + @SuppressWarnings("restriction") + public final void get(final int index, final byte[] dst, final int offset, final int length) { + if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) { + throw new IndexOutOfBoundsException(); + } + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - length) { + final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; + UNSAFE.copyMemory(null, pos, dst, arrayAddress, length); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } else { + throw new IndexOutOfBoundsException(); + } + } + + /** + * Bulk get method using the current position of the {@link ByteBuffer}. + * + * @param dst the target byte array to copy the data from MemoryChunk. + * @param offset the offset in the destination byte array. + * @param length the number of bytes to be copied. + * @throws IllegalAccessException if called by non-sequential MemoryChunk. + */ + public final void get(final byte[] dst, final int offset, final int length) throws IllegalAccessException { + if (!sequential) { + throw new IllegalAccessException("Not allowed for non-sequential MemoryChunk."); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } + buffer.get(dst, offset, length); + } + + /** + * Bulk put method using the specified index in the MemoryChunk. + * + * @param index the index in the MemoryChunk to start copying the data. + * @param src the source byte array that holds the data to be copied to MemoryChunk. + * @param offset the offset in the source byte array. + * @param length the number of bytes to be copied. + */ + @SuppressWarnings("restriction") + public final void put(final int index, final byte[] src, final int offset, final int length) { + if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) { + throw new IndexOutOfBoundsException(); + } + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - length) { + final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; + UNSAFE.copyMemory(src, arrayAddress, null, pos, length); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } else { + throw new IndexOutOfBoundsException(); + } + } + + /** + * Bulk put method using the current position of the {@link ByteBuffer}. + * + * @param src the source byte array that holds the data to be copied to MemoryChunk. + * @param offset the offset in the source byte array. + * @param length the number of bytes to be copied. + * @throws IllegalAccessException if called by non-sequential MemoryChunk. + */ + public final void put(final byte[] src, final int offset, final int length) throws IllegalAccessException { + if (!sequential) { + throw new IllegalAccessException("Not allowed for non-sequential MemoryChunk"); + } else if (isFree) { + throw new IllegalStateException("MemoryChunk has been freed"); + } + buffer.put(src, offset, length); + } + + /** + * Reads a char value from the given position. + * + * @param index The position from which the memory will be read. + * @return The char value at the given position. + * + * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus 2. + */ + @SuppressWarnings("restriction") + public final char getChar(final int index) { + final long pos = address + index; + if (index >= 0 && pos <= addressLimit - 2) { Review comment: I noticed the following magic numbers: -2, -4, -8 Either (or both) change would make things clearer. (1) A private static variable (with a descriptive name) for each number. (2) A private checker method that looks something like: boolean checkIndex(final enum type, final int index) throw new IndexOutOfBoundsException { } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
