Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19222#discussion_r179314745
--- Diff:
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
- private final long length;
+ @Nullable
+ protected Object obj;
+
+ protected long offset;
+
+ protected long length;
/**
* Optional page number; used when this MemoryBlock represents a page
allocated by a
- * TaskMemoryManager. This field is public so that it can be modified by
the TaskMemoryManager,
- * which lives in a different package.
+ * TaskMemoryManager. This field can be updated using setPageNumber
method so that
+ * this can be modified by the TaskMemoryManager, which lives in a
different package.
*/
- public int pageNumber = NO_PAGE_NUMBER;
+ private int pageNumber = NO_PAGE_NUMBER;
- public MemoryBlock(@Nullable Object obj, long offset, long length) {
- super(obj, offset);
+ protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+ if (offset < 0 || length < 0) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Length " + length + " and offset " + offset + "must be
non-negative");
+ }
+ this.obj = obj;
+ this.offset = offset;
this.length = length;
}
+ protected MemoryBlock() {
+ this(null, 0, 0);
+ }
+
+ public final Object getBaseObject() {
+ return obj;
+ }
+
+ public final long getBaseOffset() {
+ return offset;
+ }
+
+ public void resetObjAndOffset() {
+ this.obj = null;
+ this.offset = 0;
+ }
+
/**
* Returns the size of the memory block.
*/
- public long size() {
+ public final long size() {
return length;
}
- /**
- * Creates a memory block pointing to the memory used by the long array.
- */
- public static MemoryBlock fromLongArray(final long[] array) {
- return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length
* 8L);
+ public final void setPageNumber(int pageNum) {
+ pageNumber = pageNum;
+ }
+
+ public final int getPageNumber() {
+ return pageNumber;
}
/**
* Fills the memory block with the specified byte value.
*/
- public void fill(byte value) {
+ public final void fill(byte value) {
Platform.setMemory(obj, offset, length, value);
}
+
+ /**
+ * Instantiate MemoryBlock for given object type with new offset
+ */
+ public final static MemoryBlock allocateFromObject(Object obj, long
offset, long length) {
+ MemoryBlock mb = null;
+ if (obj instanceof byte[]) {
+ byte[] array = (byte[])obj;
+ mb = new ByteArrayMemoryBlock(array, offset, length);
+ } else if (obj instanceof long[]) {
+ long[] array = (long[])obj;
+ mb = new OnHeapMemoryBlock(array, offset, length);
+ } else if (obj == null) {
+ // we assume that to pass null pointer means off-heap
+ mb = new OffHeapMemoryBlock(offset, length);
+ } else {
+ throw new UnsupportedOperationException(
+ "Instantiate MemoryBlock for type " + obj.getClass() + " is not
supported now");
+ }
+ return mb;
+ }
+
+ /**
+ * Just instantiate the same type of MemoryBlock with new offset and
size. The data is not
+ * copied. If parameters are invalid, an exception is thrown
+ */
+ public abstract MemoryBlock subBlock(long offset, long size);
+
+ protected void checkSubBlockRange(long offset, long size) {
+ if (offset < 0 || size < 0) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Size " + size + " and offset " + offset + " must be
non-negative");
+ }
+ if (offset + size > length) {
+ throw new ArrayIndexOutOfBoundsException("The sum of size " + size +
" and offset " +
+ offset + " should not be larger than the length " + length + " in
the MemoryBlock");
+ }
+ }
+
+ /**
+ * getXXX/putXXX does not ensure guarantee behavior if the offset is
invalid. e.g cause illegal
+ * memory access, throw an exception, or etc.
+ * getXXX/putXXX uses an index based on this.offset that includes the
size of metadata such as
+ * JVM object header. The offset is 0-based and is expected as an
logical offset in the memory
+ * block.
+ */
+ public abstract int getInt(long offset);
+
+ public abstract void putInt(long offset, int value);
+
+ public abstract boolean getBoolean(long offset);
+
+ public abstract void putBoolean(long offset, boolean value);
+
+ public abstract byte getByte(long offset);
+
+ public abstract void putByte(long offset, byte value);
+
+ public abstract short getShort(long offset);
+
+ public abstract void putShort(long offset, short value);
+
+ public abstract long getLong(long offset);
+
+ public abstract void putLong(long offset, long value);
+
+ public abstract float getFloat(long offset);
+
+ public abstract void putFloat(long offset, float value);
+
+ public abstract double getDouble(long offset);
+
+ public abstract void putDouble(long offset, double value);
+
+ public static final void copyMemory(
+ MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset,
long length) {
+ assert(length <= src.length && length <= dst.length);
+ Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() +
srcOffset,
+ dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length);
+ }
+
+ public static final void copyMemory(MemoryBlock src, MemoryBlock dst,
long length) {
+ assert(length <= src.length && length <= dst.length);
+ Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(),
+ dst.getBaseObject(), dst.getBaseOffset(), length);
+ }
+
+ public final void copyFrom(Object src, long srcOffset, long dstOffset,
long length) {
+ assert(length <= this.length);
+ Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length);
+ }
+
+ public final void writeTo(long srcOffset, Object dst, long dstOffset,
long length) {
+ assert(length <= this.length);
--- End diff --
hmm, doesn't it be `assert(length <= this.length - srcOffset)`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]