Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19222#discussion_r179314879
  
    --- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
    @@ -45,38 +45,162 @@
        */
       public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
     
    -  private final long length;
    +  @Nullable
    +  protected Object obj;
    +
    +  protected long offset;
    +
    +  protected long length;
     
       /**
        * Optional page number; used when this MemoryBlock represents a page 
allocated by a
    -   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
    -   * which lives in a different package.
    +   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
    +   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
        */
    -  public int pageNumber = NO_PAGE_NUMBER;
    +  private int pageNumber = NO_PAGE_NUMBER;
     
    -  public MemoryBlock(@Nullable Object obj, long offset, long length) {
    -    super(obj, offset);
    +  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
    +    if (offset < 0 || length < 0) {
    +      throw new ArrayIndexOutOfBoundsException(
    +        "Length " + length + " and offset " + offset + "must be 
non-negative");
    +    }
    +    this.obj = obj;
    +    this.offset = offset;
         this.length = length;
       }
     
    +  protected MemoryBlock() {
    +    this(null, 0, 0);
    +  }
    +
    +  public final Object getBaseObject() {
    +    return obj;
    +  }
    +
    +  public final long getBaseOffset() {
    +    return offset;
    +  }
    +
    +  public void resetObjAndOffset() {
    +    this.obj = null;
    +    this.offset = 0;
    +  }
    +
       /**
        * Returns the size of the memory block.
        */
    -  public long size() {
    +  public final long size() {
         return length;
       }
     
    -  /**
    -   * Creates a memory block pointing to the memory used by the long array.
    -   */
    -  public static MemoryBlock fromLongArray(final long[] array) {
    -    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
    +  public final void setPageNumber(int pageNum) {
    +    pageNumber = pageNum;
    +  }
    +
    +  public final int getPageNumber() {
    +    return pageNumber;
       }
     
       /**
        * Fills the memory block with the specified byte value.
        */
    -  public void fill(byte value) {
    +  public final void fill(byte value) {
         Platform.setMemory(obj, offset, length, value);
       }
    +
    +  /**
    +   * Instantiate MemoryBlock for given object type with new offset
    +   */
    +  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
    +    MemoryBlock mb = null;
    +    if (obj instanceof byte[]) {
    +      byte[] array = (byte[])obj;
    +      mb = new ByteArrayMemoryBlock(array, offset, length);
    +    } else if (obj instanceof long[]) {
    +      long[] array = (long[])obj;
    +      mb = new OnHeapMemoryBlock(array, offset, length);
    +    } else if (obj == null) {
    +      // we assume that to pass null pointer means off-heap
    +      mb = new OffHeapMemoryBlock(offset, length);
    +    } else {
    +      throw new UnsupportedOperationException(
    +        "Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
    +    }
    +    return mb;
    +  }
    +
    +  /**
    +   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
    +   * copied. If parameters are invalid, an exception is thrown
    +   */
    +  public abstract MemoryBlock subBlock(long offset, long size);
    +
    +  protected void checkSubBlockRange(long offset, long size) {
    +    if (offset < 0 || size < 0) {
    +      throw new ArrayIndexOutOfBoundsException(
    +        "Size " + size + " and offset " + offset + " must be 
non-negative");
    +    }
    +    if (offset + size > length) {
    +      throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
    +        offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
    +    }
    +  }
    +
    +  /**
    +   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
    +   * memory access, throw an exception, or etc.
    +   * getXXX/putXXX uses an index based on this.offset that includes the 
size of metadata such as
    +   * JVM object header. The offset is 0-based and is expected as an 
logical offset in the memory
    +   * block.
    +   */
    +  public abstract int getInt(long offset);
    +
    +  public abstract void putInt(long offset, int value);
    +
    +  public abstract boolean getBoolean(long offset);
    +
    +  public abstract void putBoolean(long offset, boolean value);
    +
    +  public abstract byte getByte(long offset);
    +
    +  public abstract void putByte(long offset, byte value);
    +
    +  public abstract short getShort(long offset);
    +
    +  public abstract void putShort(long offset, short value);
    +
    +  public abstract long getLong(long offset);
    +
    +  public abstract void putLong(long offset, long value);
    +
    +  public abstract float getFloat(long offset);
    +
    +  public abstract void putFloat(long offset, float value);
    +
    +  public abstract double getDouble(long offset);
    +
    +  public abstract void putDouble(long offset, double value);
    +
    +  public static final void copyMemory(
    +      MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, 
long length) {
    +    assert(length <= src.length && length <= dst.length);
    +    Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + 
srcOffset,
    +      dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length);
    +  }
    +
    +  public static final void copyMemory(MemoryBlock src, MemoryBlock dst, 
long length) {
    +    assert(length <= src.length && length <= dst.length);
    +    Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(),
    +      dst.getBaseObject(), dst.getBaseOffset(), length);
    +  }
    +
    +  public final void copyFrom(Object src, long srcOffset, long dstOffset, 
long length) {
    +    assert(length <= this.length);
    +    Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length);
    --- End diff --
    
    `offset + srcOffset`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to