[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19222


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179497501
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -183,15 +184,13 @@ protected void checkSubBlockRange(long offset, long 
size) {
 
   public static final void copyMemory(
   MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, 
long length) {
-assert(length <= (src.length - src.getBaseOffset()) &&
-   length <= (dst.length - dst.getBaseOffset()));
+assert(length <= src.length && length <= dst.length);
 Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + 
srcOffset,
--- End diff --

Oh, i see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179497409
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -142,7 +143,7 @@ protected void checkSubBlockRange(long offset, long 
size) {
 }
 if (offset + size > length) {
   throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
-offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+offset + " should be equal to or subset of the original 
MemoryBlock");
--- End diff --

I see, let me revert.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179485835
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -183,15 +184,13 @@ protected void checkSubBlockRange(long offset, long 
size) {
 
   public static final void copyMemory(
   MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, 
long length) {
-assert(length <= (src.length - src.getBaseOffset()) &&
-   length <= (dst.length - dst.getBaseOffset()));
+assert(length <= src.length && length <= dst.length);
 Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + 
srcOffset,
--- End diff --

so `srcOffset` is relative, which means we should make sure 
`src.getBaseOffset() + srcOffset + length < src.length`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179485268
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -142,7 +143,7 @@ protected void checkSubBlockRange(long offset, long 
size) {
 }
 if (offset + size > length) {
   throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
-offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+offset + " should be equal to or subset of the original 
MemoryBlock");
--- End diff --

the previous message looks more clear.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179402310
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,164 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new IllegalArgumentException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with the new absolute 
offset and size. The data
--- End diff --

I may misread your comment. I thought that you wanted to say "absolute".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179397907
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,164 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new IllegalArgumentException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with the new absolute 
offset and size. The data
--- End diff --

As the implementation of `subBlock` returns something like `new 
OffHeapMemoryBlock(this.offset + offset, size)`, doesn't it be relative offset 
to original offset?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179391228
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,164 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new IllegalArgumentException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with the new absolute 
offset and size. The data
--- End diff --

First of all, this API does not allow users to *expand* a memory block due 
to `checkSubBlockRange()`. This check does not accept `offset < 0` or `offset + 
size > length`.

I have to update this assert as `this.offset + offset + size > length`. 
Also, I will add *subset* in the comment, and add some tests in 
`MemoryBlockSuite`

```
  public MemoryBlock subBlock(long offset, long size) {
checkSubBlockRange(offset, size);
return new OnHeapMemoryBlock(array, this.offset + offset, size);
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179385907
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,164 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new IllegalArgumentException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with the new absolute 
offset and size. The data
--- End diff --

This seems a dangerous API to me, we should not allow users to "expand" a 
memory block, do we have a use case for it? I'd like to make the `offset` 
parameter an increment of the current offset, i.e. `this.subBlock(0, 
this.size)` returns the same block.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179335313
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws 
IOException {
 final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
 for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
-  UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i)
+  new UTF8String(
+new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i))
--- End diff --

Sure, I just say it looks strange. I know you didn't change the unit test's 
behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179335019
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws 
IOException {
 final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
 for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
-  UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i)
+  new UTF8String(
+new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i))
--- End diff --

This change follows the original behavior for the backward compatibility.

If we want to change the behavior, update test, or delete this, it would be 
good to address in another PR. Is it better to discuss this UT in another PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179333954
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws 
IOException {
 final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
 for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
-  UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i)
+  new UTF8String(
+new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i))
--- End diff --

It depends on what we are testing at this unit test. From the comment in 
this test `// offset underflow is apparently supported?`, looks like this 
behavior is uncertain.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179332522
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws 
IOException {
 final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
 for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
-  UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i)
+  new UTF8String(
+new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i))
--- End diff --

What is your suggestion to make it non-strange?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179332281
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
+  }
+
+  public ByteArrayMemoryBlock(long length) {
+this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, 
length);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+checkSubBlockRange(offset, size);
+return new ByteArrayMemoryBlock(array, this.offset + offset, size);
+  }
+
+  public byte[] getByteArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the byte array.
+   */
+  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, 
array.length);
+  }
+
+  @Override
+  public final int getInt(long offset) {
+return Platform.getInt(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putInt(long offset, int value) {
+Platform.putInt(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final boolean getBoolean(long offset) {
+return Platform.getBoolean(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putBoolean(long offset, boolean value) {
+Platform.putBoolean(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final byte getByte(long offset) {
+return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
+  }
+
+  @Override
+  public final void putByte(long offset, byte value) {
+array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = 
value;
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179331924
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
+  }
+
+  public ByteArrayMemoryBlock(long length) {
+this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, 
length);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+checkSubBlockRange(offset, size);
+return new ByteArrayMemoryBlock(array, this.offset + offset, size);
+  }
+
+  public byte[] getByteArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the byte array.
+   */
+  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, 
array.length);
+  }
+
+  @Override
+  public final int getInt(long offset) {
+return Platform.getInt(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putInt(long offset, int value) {
+Platform.putInt(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final boolean getBoolean(long offset) {
+return Platform.getBoolean(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putBoolean(long offset, boolean value) {
+Platform.putBoolean(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final byte getByte(long offset) {
+return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
--- End diff --

Suggestion from @cloud-fan. The Java array access is faster than 
`Platform.getByte`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179321572
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws 
IOException {
 final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
 for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
-  UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i)
+  new UTF8String(
+new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i))
--- End diff --

Looks a bit strange.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179321207
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
 ---
@@ -19,15 +19,21 @@
 
 import org.apache.spark.unsafe.Platform;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+import sun.nio.ch.DirectBuffer;
--- End diff --

Seems unused imports?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179320913
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a long array on Java heap.
+ */
+public final class OnHeapMemoryBlock extends MemoryBlock {
+
+  private final long[] array;
+
+  public OnHeapMemoryBlock(long[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
--- End diff --

ditto. maybe "the size of the given memory space"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179320302
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
+  }
+
+  public ByteArrayMemoryBlock(long length) {
+this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, 
length);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+checkSubBlockRange(offset, size);
+return new ByteArrayMemoryBlock(array, this.offset + offset, size);
+  }
+
+  public byte[] getByteArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the byte array.
+   */
+  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, 
array.length);
+  }
+
+  @Override
+  public final int getInt(long offset) {
+return Platform.getInt(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putInt(long offset, int value) {
+Platform.putInt(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final boolean getBoolean(long offset) {
+return Platform.getBoolean(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putBoolean(long offset, boolean value) {
+Platform.putBoolean(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final byte getByte(long offset) {
+return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
+  }
+
+  @Override
+  public final void putByte(long offset, byte value) {
+array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = 
value;
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179320267
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
+  }
+
+  public ByteArrayMemoryBlock(long length) {
+this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, 
length);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+checkSubBlockRange(offset, size);
+return new ByteArrayMemoryBlock(array, this.offset + offset, size);
+  }
+
+  public byte[] getByteArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the byte array.
+   */
+  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, 
array.length);
+  }
+
+  @Override
+  public final int getInt(long offset) {
+return Platform.getInt(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putInt(long offset, int value) {
+Platform.putInt(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final boolean getBoolean(long offset) {
+return Platform.getBoolean(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putBoolean(long offset, boolean value) {
+Platform.putBoolean(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final byte getByte(long offset) {
+return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
--- End diff --

Why don't use `Platform.getByte`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179320092
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a long array on Java heap.
+ */
+public final class OnHeapMemoryBlock extends MemoryBlock {
--- End diff --

Can we put @cloud-fan's comment 
https://github.com/apache/spark/pull/19222/files#r172356146 into 
`OnHeapMemoryBlock`'s doc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179319738
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
--- End diff --

hmm, from here and the accessors below, looks like the given `offset` 
includes `Platform.BYTE_ARRAY_OFFSET`? What will happen if a given offset is 
less than that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179319478
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
--- End diff --

Said `obj.length + Platform.BYTE_ARRAY_OFFSET` as the array size might be a 
bit confusing in an error message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179318575
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java 
---
@@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int 
numBytes) {
   public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
 
   private static final boolean unaligned = Platform.unaligned();
+  /**
+   * MemoryBlock equality check for MemoryBlocks.
+   * @return true if the arrays are equal, false otherwise
+   */
+  public static boolean arrayEqualsBlock(
--- End diff --

I think this only works for `ByteArrayMemoryBlock`, doesn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179315793
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
--- End diff --

The semantic of parameter offset is not totally clear here. From the check 
(`offset + size <= length`) in `checkSubBlockRange`, looks like it is 
relative offset from original offset. But an offset parameter here can 
be also explained as the new absolute offset. I think it is better to clearly 
state the semantics here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179315071
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + offset + " must be 
non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   * getXXX/putXXX uses an index based on this.offset that includes the 
size of metadata such as
+   * JVM object header. The offset is 0-based and is expected as an 
logical offset in the memory
+   * block.
+   */
+  public abstract int getInt(long offset);
+
+  public abstract void putInt(long offset, int value);
+
+  public abstract boolean getBoolean(long offset);
+
+  public abstract void pu

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179314982
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + offset + " must be 
non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   * getXXX/putXXX uses an index based on this.offset that includes the 
size of metadata such as
+   * JVM object header. The offset is 0-based and is expected as an 
logical offset in the memory
+   * block.
+   */
+  public abstract int getInt(long offset);
+
+  public abstract void putInt(long offset, int value);
+
+  public abstract boolean getBoolean(long offset);
+
+  public abstract void pu

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179314879
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + offset + " must be 
non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   * getXXX/putXXX uses an index based on this.offset that includes the 
size of metadata such as
+   * JVM object header. The offset is 0-based and is expected as an 
logical offset in the memory
+   * block.
+   */
+  public abstract int getInt(long offset);
+
+  public abstract void putInt(long offset, int value);
+
+  public abstract boolean getBoolean(long offset);
+
+  public abstract void pu

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179314745
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + offset + " must be 
non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   * getXXX/putXXX uses an index based on this.offset that includes the 
size of metadata such as
+   * JVM object header. The offset is 0-based and is expected as an 
logical offset in the memory
+   * block.
+   */
+  public abstract int getInt(long offset);
+
+  public abstract void putInt(long offset, int value);
+
+  public abstract boolean getBoolean(long offset);
+
+  public abstract void pu

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-04-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r179313290
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
--- End diff --

nit: seems not really array index out of bounds? Maybe an 
`IllegalArgumentException`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-29 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r178083854
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -250,6 +246,7 @@ public long getPrefix() {
   }
 }
 p &= ~mask;
+System.out.println("P: "+Long.toHexString(p)+", 
obj="+base.getBaseObject()+", offset="+base.getBaseOffset()+", 
size="+base.size()+", numBytes="+numBytes);
--- End diff --

Good catch, this is my mistake.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r178020801
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -250,6 +246,7 @@ public long getPrefix() {
   }
 }
 p &= ~mask;
+System.out.println("P: "+Long.toHexString(p)+", 
obj="+base.getBaseObject()+", offset="+base.getBaseOffset()+", 
size="+base.size()+", numBytes="+numBytes);
--- End diff --

mistake?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r178020710
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a long array on Java heap.
+ */
+public final class OnHeapMemoryBlock extends MemoryBlock {
+
+  private final long[] array;
+
+  public OnHeapMemoryBlock(long[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
+  }
+
+  public OnHeapMemoryBlock(long size) {
+this(new long[Ints.checkedCast((size + 7) / 8)], 
Platform.LONG_ARRAY_OFFSET, size);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+checkSubBlockRange(offset, size);
+return new OnHeapMemoryBlock(array, this.offset + offset, size);
+  }
+
+  public long[] getLongArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the long array.
+   */
+  public static OnHeapMemoryBlock fromArray(final long[] array) {
+return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, 
array.length * 8L);
+  }
+
+  public static OnHeapMemoryBlock fromArray(final long[] array, long size) 
{
+return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+  }
+
+  @Override
+  public final int getInt(long offset) {
+return Platform.getInt(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putInt(long offset, int value) {
+Platform.putInt(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final boolean getBoolean(long offset) {
+return Platform.getBoolean(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putBoolean(long offset, boolean value) {
+Platform.putBoolean(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final byte getByte(long offset) {
+return Platform.getByte(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putByte(long offset, byte value) {
+Platform.putByte(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final short getShort(long offset) {
+return Platform.getShort(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putShort(long offset, short value) {
+Platform.putShort(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final long getLong(long offset) {
+return Platform.getLong(array, this.offset + offset);
--- End diff --

ah, endianess matters here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177958142
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
--- End diff --

To add this assertion cause a new failure at 
[`UTF8StringSuite.writeToOutputStreamUnderflow()`](https://github.com/apache/spark/pull/19222/files#diff-321a62638d3ef7bbc9c35842967c868bR515).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177953620
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws 
IOException {
 final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
 for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
-  UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i)
+  new UTF8String(
+new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i))
--- End diff --

I thought this is what you said 
[here](https://github.com/apache/spark/pull/19222#discussion_r176986304).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177694735
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java 
---
@@ -377,9 +378,10 @@ final UTF8String getUTF8String(int rowId) {
   if (stringResult.isSet == 0) {
 return null;
   } else {
-return UTF8String.fromAddress(null,
-  stringResult.buffer.memoryAddress() + stringResult.start,
-  stringResult.end - stringResult.start);
+int size = stringResult.end - stringResult.start;
+OffHeapMemoryBlock mb = new OffHeapMemoryBlock(
+  stringResult.buffer.memoryAddress() + stringResult.start, size);
+return new UTF8String(mb);
--- End diff --

we can follow the previous code style:
```
new UTF8String(new OffHeapMemoryBlock(
  stringResult.buffer.memoryAddress() + stringResult.start,
  stringResult.end - stringResult.start
))
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177693101
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -27,6 +27,9 @@
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
--- End diff --

we only need to import MemoryBlock


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177693045
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 ---
@@ -37,6 +37,7 @@
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.memory.*;
--- End diff --

we only need to import MemoryBlock


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177692709
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -348,10 +342,7 @@ public UnsafeSorterIterator getSortedIterator() {
   array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7,
   radixSortSupport.sortDescending(), 
radixSortSupport.sortSigned());
   } else {
-MemoryBlock unused = new MemoryBlock(
-  array.getBaseObject(),
-  array.getBaseOffset() + pos * 8L,
-  (array.size() - pos) * 8L);
+MemoryBlock unused = array.memoryBlock().subBlock(pos * 
8L,(array.size() - pos) * 8L);
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177692646
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java ---
@@ -60,13 +60,8 @@ public void copyElement(LongArray src, int srcPos, 
LongArray dst, int dstPos) {
 
   @Override
   public void copyRange(LongArray src, int srcPos, LongArray dst, int 
dstPos, int length) {
-Platform.copyMemory(
-  src.getBaseObject(),
-  src.getBaseOffset() + srcPos * 8L,
-  dst.getBaseObject(),
-  dst.getBaseOffset() + dstPos * 8L,
-  length * 8L
-);
+MemoryBlock.copyMemory(src.memoryBlock(),srcPos * 8L,
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177692472
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -105,13 +104,7 @@ public void reset() {
 
   public void expandPointerArray(LongArray newArray) {
 assert(newArray.size() > array.size());
-Platform.copyMemory(
-  array.getBaseObject(),
-  array.getBaseOffset(),
-  newArray.getBaseObject(),
-  newArray.getBaseOffset(),
-  pos * 8L
-);
+MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(),pos 
* 8L);
--- End diff --

space after `,`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177692581
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -180,10 +173,7 @@ public ShuffleSorterIterator getSortedIterator() {
 PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
 PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
 } else {
-  MemoryBlock unused = new MemoryBlock(
-array.getBaseObject(),
-array.getBaseOffset() + pos * 8L,
-(array.size() - pos) * 8L);
+  MemoryBlock unused = array.memoryBlock().subBlock(pos * 
8L,(array.size() - pos) * 8L);
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177692142
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws 
IOException {
 final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
 for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
-  UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i)
+  new UTF8String(
+new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, 
test.length + i))
--- End diff --

I'm surprised this works...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177687204
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -257,12 +258,13 @@ public long getPrefix() {
*/
   public byte[] getBytes() {
 // avoid copy if `base` is `byte[]`
-if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
-  && ((byte[]) base).length == numBytes) {
-  return (byte[]) base;
+long offset = base.getBaseOffset();
+if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
+  && (((ByteArrayMemoryBlock) base).getByteArray()).length == 
numBytes) {
--- End diff --

weird, do you know why? they should be same logically.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177686923
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -119,15 +118,16 @@ public static UTF8String blankString(int length) {
 return fromBytes(spaces);
   }
 
-  protected UTF8String(Object base, long offset, int numBytes) {
+  public UTF8String(MemoryBlock base) {
 this.base = base;
-this.offset = offset;
-this.numBytes = numBytes;
+if (base != null) {
+  this.numBytes = Ints.checkedCast(base.size());
+}
   }
 
   // for serialization
   public UTF8String() {
-this(null, 0, 0);
+this(null);
--- End diff --

if this is the only place to pass in a null MemoryBlock, we can probably do 
nothing here instead of calling `this(null)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177686329
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -77,7 +81,8 @@
*/
   public static UTF8String fromBytes(byte[] bytes) {
 if (bytes != null) {
-  return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length);
+  return new UTF8String(
+MemoryBlock.allocateFromObject(bytes, BYTE_ARRAY_OFFSET, 
bytes.length));
--- End diff --

we can create `ByteArrayMemoryBlock` directly here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177686396
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -90,19 +95,13 @@ public static UTF8String fromBytes(byte[] bytes) {
*/
   public static UTF8String fromBytes(byte[] bytes, int offset, int 
numBytes) {
 if (bytes != null) {
-  return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes);
+  return new UTF8String(
+MemoryBlock.allocateFromObject(bytes, BYTE_ARRAY_OFFSET + offset, 
numBytes));
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177684841
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,162 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (offset < 0 || size < 0 || this.offset + offset < 0) {
--- End diff --

`this.offset + offset` seems unnecessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177682999
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
--- End diff --

shall we require `offset >= Platform.BYTE_ARRAY_OFFSET`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177681282
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,49 +51,67 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
   }
 
-  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
 // This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = Ints.checkedCast(base.size());
 assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
-int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+int h1 = hashBytesByIntBlock(base, seed);
 return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
--- End diff --

nit: we don't need this, it's checked in `hashUnsafeWordsBlock`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177511234
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -257,12 +258,13 @@ public long getPrefix() {
*/
   public byte[] getBytes() {
 // avoid copy if `base` is `byte[]`
-if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
-  && ((byte[]) base).length == numBytes) {
-  return (byte[]) base;
+long offset = base.getBaseOffset();
+if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
+  && (((ByteArrayMemoryBlock) base).getByteArray()).length == 
numBytes) {
--- End diff --

This change caused some test failures, for example in `ColumnarBatchSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177409456
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteOrder;
+
+import static org.hamcrest.core.StringContains.containsString;
+
+public class MemoryBlockSuite {
--- End diff --

Thank you for your suggestion. As you said, I used `junit` for testing Java 
classes instead of `scalatest` ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177404199
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java 
---
@@ -71,40 +74,47 @@ public static long hashLong(long input, long seed) {
 return fmix(hash);
   }
 
-  public long hashUnsafeWords(Object base, long offset, int length) {
-return hashUnsafeWords(base, offset, length, seed);
+  public long hashUnsafeWordsBlock(MemoryBlock mb) {
+return hashUnsafeWordsBlock(mb, seed);
   }
 
-  public static long hashUnsafeWords(Object base, long offset, int length, 
long seed) {
-assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 
(word-aligned)";
-long hash = hashBytesByWords(base, offset, length, seed);
+  public static long hashUnsafeWordsBlock(MemoryBlock mb, long seed) {
+assert (mb.size() % 8 == 0) : "lengthInBytes must be a multiple of 8 
(word-aligned)";
+long hash = hashBytesByWordsBlock(mb, seed);
 return fmix(hash);
   }
 
   public long hashUnsafeBytes(Object base, long offset, int length) {
 return hashUnsafeBytes(base, offset, length, seed);
   }
 
-  public static long hashUnsafeBytes(Object base, long offset, int length, 
long seed) {
+  public static long hashUnsafeBytesBlock(MemoryBlock mb, long seed) {
+Object base = mb.getBaseObject();
--- End diff --

where do we use base?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177403002
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteOrder;
+
+import static org.hamcrest.core.StringContains.containsString;
+
+public class MemoryBlockSuite {
--- End diff --

We don't have to write tests in java to test java classes. It's OK as you 
already wrote it, please write tests in scala next time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177400664
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -257,12 +258,13 @@ public long getPrefix() {
*/
   public byte[] getBytes() {
 // avoid copy if `base` is `byte[]`
-if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
-  && ((byte[]) base).length == numBytes) {
-  return (byte[]) base;
+long offset = base.getBaseOffset();
+if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
+  && (((ByteArrayMemoryBlock) base).getByteArray()).length == 
numBytes) {
--- End diff --

nit: `base.size == numBytes`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177306568
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -69,8 +64,9 @@ public long size() {
* Fill this all with 0L.
*/
   public void zeroOut() {
+long baseOffset = memory.getBaseOffset();
 for (long off = baseOffset; off < baseOffset + length * WIDTH; off += 
WIDTH) {
--- End diff --

Good catch. These three unchanges seems to lead to failures.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177296492
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a long array on Java heap.
+ */
+public final class OnHeapMemoryBlock extends MemoryBlock {
+
+  private final long[] array;
+
+  public OnHeapMemoryBlock(long[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
+  "The sum of size " + size + " and offset " + offset + " should not 
be larger than " +
+"the array size " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
+  }
+
+  public OnHeapMemoryBlock(long size) {
+this(new long[Ints.checkedCast((size + 7) / 8)], 
Platform.LONG_ARRAY_OFFSET, size);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+checkSubBlockRange(offset, size);
+return new OnHeapMemoryBlock(array, this.offset + offset, size);
+  }
+
+  public long[] getLongArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the long array.
+   */
+  public static OnHeapMemoryBlock fromArray(final long[] array) {
+return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, 
array.length * 8L);
+  }
+
+  public static OnHeapMemoryBlock fromArray(final long[] array, long size) 
{
+return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+  }
+
+  @Override
+  public final int getInt(long offset) {
+return Platform.getInt(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putInt(long offset, int value) {
+Platform.putInt(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final boolean getBoolean(long offset) {
+return Platform.getBoolean(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putBoolean(long offset, boolean value) {
+Platform.putBoolean(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final byte getByte(long offset) {
+return Platform.getByte(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putByte(long offset, byte value) {
+Platform.putByte(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final short getShort(long offset) {
+return Platform.getShort(array, this.offset + offset);
+  }
+
+  @Override
+  public final void putShort(long offset, short value) {
+Platform.putShort(array, this.offset + offset, value);
+  }
+
+  @Override
+  public final long getLong(long offset) {
+return Platform.getLong(array, this.offset + offset);
--- End diff --

shall we also apply 
https://github.com/apache/spark/pull/19222/files#diff-b9576c68b154d5e554671ffa84bfa74eR80
 here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177296342
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,161 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + (this.offset + offset) + " must 
be non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   * getXXX/putXXX uses an index based on this.offset that includes the 
size of metadata such as
+   * JVM object header. Thus, the offset is expected as an logical offset 
in the memory block.
--- End diff --

also mention that the offset is 0-based.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
F

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177296162
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,161 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
--- End diff --

the check should be `offset >= 0`, the sub-block must be real "sub".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177295079
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -89,6 +85,6 @@ public void set(int index, long value) {
   public long get(int index) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < length : "index (" + index + ") should < length (" + 
length + ")";
-return Platform.getLong(baseObj, baseOffset + index * WIDTH);
+return memory.getLong(memory.getBaseOffset() + index * WIDTH);
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177295038
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -80,7 +76,7 @@ public void zeroOut() {
   public void set(int index, long value) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < length : "index (" + index + ") should < length (" + 
length + ")";
-Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
--- End diff --

update it to use 0-based offset.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177294950
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -69,8 +64,9 @@ public long size() {
* Fill this all with 0L.
*/
   public void zeroOut() {
+long baseOffset = memory.getBaseOffset();
 for (long off = baseOffset; off < baseOffset + length * WIDTH; off += 
WIDTH) {
--- End diff --

the `off` should starts with 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r177217305
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,159 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + (this.offset + offset) + " must 
be non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   */
+  public abstract int getInt(long offset);
--- End diff --

@cloud-fan After prototyping, I succeeded to make `UTF8String` right in 
this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-25 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r176992334
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,49 +51,70 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
   }
 
-  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
 // This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = Ints.checkedCast(base.size());
 assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
-int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+int h1 = hashBytesByIntBlock(base, seed);
 return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
 // This is not compatible with original and another implementations.
 // But remain it for backward compatibility for the components 
existing before 2.3.
+long offset = base.getBaseOffset();
+long lengthInBytes = base.size();
--- End diff --

Sure, I will do them


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-25 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r176992008
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,159 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + (this.offset + offset) + " must 
be non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   */
+  public abstract int getInt(long offset);
--- End diff --

If we work for `0-based offset`, I think that we may need to update 
additional many files. This is because many files include hard-coded offset 
such as `Platform.BYTE_ARRAY_OFFSET`. In particular, the generated code uses 
the hard-coded offset.
I think that it seems to very hard and buggy work to make it right with 
intro

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r176985708
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,159 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + (this.offset + offset) + " must 
be non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   */
+  public abstract int getInt(long offset);
--- End diff --

We should clearly document what's the expectation of the `offset` 
parameter, i.e. an index based on `this.getBaseOffset`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org


[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r176985312
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,49 +51,70 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
   }
 
-  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
 // This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = Ints.checkedCast(base.size());
 assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
-int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+int h1 = hashBytesByIntBlock(base, seed);
 return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
 // This is not compatible with original and another implementations.
 // But remain it for backward compatibility for the components 
existing before 2.3.
+long offset = base.getBaseOffset();
+long lengthInBytes = base.size();
--- End diff --

shall we cast to int here instead of at the end?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r176985334
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,49 +51,70 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
   }
 
-  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
 // This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = Ints.checkedCast(base.size());
 assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
-int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+int h1 = hashBytesByIntBlock(base, seed);
 return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes, int seed) {
+  public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
 // This is not compatible with original and another implementations.
 // But remain it for backward compatibility for the components 
existing before 2.3.
+long offset = base.getBaseOffset();
+long lengthInBytes = base.size();
 assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
-int lengthAligned = lengthInBytes - lengthInBytes % 4;
-int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
-for (int i = lengthAligned; i < lengthInBytes; i++) {
-  int halfWord = Platform.getByte(base, offset + i);
+long lengthAligned = lengthInBytes - lengthInBytes % 4;
+int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
+for (long i = lengthAligned; i < lengthInBytes; i++) {
+  int halfWord = base.getByte(offset + i);
   int k1 = mixK1(halfWord);
   h1 = mixH1(h1, k1);
 }
-return fmix(h1, lengthInBytes);
+return fmix(h1, Ints.checkedCast(lengthInBytes));
+  }
+
+  public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes, int seed) {
+return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
   }
 
   public static int hashUnsafeBytes2(Object base, long offset, int 
lengthInBytes, int seed) {
+return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
 // This is compatible with original and another implementations.
 // Use this method for new components after Spark 2.3.
-assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
-int lengthAligned = lengthInBytes - lengthInBytes % 4;
-int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
+long offset = base.getBaseOffset();
+long lengthInBytes = base.size();
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r176986033
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,159 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + (this.offset + offset) + " must 
be non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   */
+  public abstract int getInt(long offset);
--- End diff --

BTW do you have a list of followup JIRAs? If you are the only one working 
on it, I'm fine to change the semantics of the offset later. Otherwise I'd like 
to make it right at the first version, i.e. 0-based offset.


---

-
To unsubscribe, e-mail: reviews-

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r176986304
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -119,15 +116,20 @@ public static UTF8String blankString(int length) {
 return fromBytes(spaces);
   }
 
-  protected UTF8String(Object base, long offset, int numBytes) {
+  protected UTF8String(byte[] bytes, long offset, int numBytes) {
--- End diff --

why do we need this? I think the caller side should call 
`MemoryBlock.allocateFromObject`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175885446
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,55 +51,81 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = Ints.checkedCast(base.size());
+assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
+int h1 = hashBytesByIntBlock(base, seed);
+return fmix(h1, lengthInBytes);
   }
 
   public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
 // This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
 assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
-int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+int h1 = hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
 return fmix(h1, lengthInBytes);
--- End diff --

Thanks for good catch. I realized that current UTs do not test 
`hashUnsafeWords` well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175880756
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,55 +51,81 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = Ints.checkedCast(base.size());
+assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
+int h1 = hashBytesByIntBlock(base, seed);
+return fmix(h1, lengthInBytes);
   }
 
   public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
 // This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
 assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
-int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+int h1 = hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
 return fmix(h1, lengthInBytes);
--- End diff --

we don't need this line now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175879336
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -119,15 +115,24 @@ public static UTF8String blankString(int length) {
 return fromBytes(spaces);
   }
 
-  protected UTF8String(Object base, long offset, int numBytes) {
+  protected UTF8String(byte[] bytes, long offset, int numBytes) {
+this(new ByteArrayMemoryBlock(bytes, offset, numBytes));
+  }
+
+  public UTF8String(MemoryBlock base) {
 this.base = base;
-this.offset = offset;
-this.numBytes = numBytes;
+if (base != null) {
--- End diff --

ah i see


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175868832
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -80,7 +76,7 @@ public void zeroOut() {
   public void set(int index, long value) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < length : "index (" + index + ") should < length (" + 
length + ")";
-Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
+memory.putLong(memory.getBaseOffset() + index * WIDTH, value);
--- End diff --

the problem is that, the semantic of the offset argument is pretty 
important to the `MemoryBlock` component. If we don't do it right at the first 
version, we bring a bad history to `MemoryBlock`, which may confuse other 
people.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175865685
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -119,15 +115,24 @@ public static UTF8String blankString(int length) {
 return fromBytes(spaces);
   }
 
-  protected UTF8String(Object base, long offset, int numBytes) {
+  protected UTF8String(byte[] bytes, long offset, int numBytes) {
+this(new ByteArrayMemoryBlock(bytes, offset, numBytes));
+  }
+
+  public UTF8String(MemoryBlock base) {
 this.base = base;
-this.offset = offset;
-this.numBytes = numBytes;
+if (base != null) {
--- End diff --

`UTF8String(null)` is actually called for serialization. Thus, we cannot 
replace `if` with `assert`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175864633
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,159 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected final long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + (this.offset + offset) + " must 
be non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   */
+  public abstract int getInt(long offset);
+
+  public abstract void putInt(long offset, int value);
+
+  public abstract boolean getBoolean(long offset);
+
+  public abstract void putBoolean(long offset, boolean value);
+
+  public abstract byte getByte(long offset);
+
+  public abstract void putByte(long offset, byte value);
+
+  public abstra

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175862823
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 ---
@@ -94,12 +95,12 @@ public void free(MemoryBlock memory) {
 }
 
 // Mark the page as freed (so we can detect double-frees).
-memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
+memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
 
 // As an additional layer of defense against use-after-free bugs, we 
mutate the
 // MemoryBlock to null out its reference to the long[] array.
-long[] array = (long[]) memory.obj;
-memory.setObjAndOffset(null, 0);
+long[] array = ((OnHeapMemoryBlock)memory).getLongArray();
+memory.resetObjAndOffset();
 
 long alignedSize = ((size + 7) / 8) * 8;
 if (shouldPool(alignedSize)) {
--- End diff --

Yeah, in that case, we will move `allocate` and `free` methods into 
`MemoryAllocator` class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175859167
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java 
---
@@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int 
numBytes) {
   public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
 
   private static final boolean unaligned = Platform.unaligned();
+  /**
+   * MemoryBlock equality check for MemoryBlocks.
+   * @return true if the arrays are equal, false otherwise
+   */
+  public static boolean arrayEqualsBlock(
--- End diff --

This method is called from `UTF8String` class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175689353
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java 
---
@@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int 
numBytes) {
   public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
 
   private static final boolean unaligned = Platform.unaligned();
+  /**
+   * MemoryBlock equality check for MemoryBlocks.
+   * @return true if the arrays are equal, false otherwise
+   */
+  public static boolean arrayEqualsBlock(
--- End diff --

Thank you for your comments. I will address them on Wed or Thu.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175678610
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java 
---
@@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int 
numBytes) {
   public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
 
   private static final boolean unaligned = Platform.unaligned();
+  /**
+   * MemoryBlock equality check for MemoryBlocks.
+   * @return true if the arrays are equal, false otherwise
+   */
+  public static boolean arrayEqualsBlock(
--- End diff --

Thank you for your review. I will address them on Wed or Thu.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175678287
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -80,7 +76,7 @@ public void zeroOut() {
   public void set(int index, long value) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < length : "index (" + index + ") should < length (" + 
length + ")";
-Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
+memory.putLong(memory.getBaseOffset() + index * WIDTH, value);
--- End diff --

Yeah, we can encourage more people to use the new memory management and its 
API.
On the other hand, it would be good to take two-step approach:
1. Use new MemoryBlock with the current explicit offset usage
2. Hide offset stuff in the implementation.

WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175605407
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -119,15 +115,24 @@ public static UTF8String blankString(int length) {
 return fromBytes(spaces);
   }
 
-  protected UTF8String(Object base, long offset, int numBytes) {
+  protected UTF8String(byte[] bytes, long offset, int numBytes) {
+this(new ByteArrayMemoryBlock(bytes, offset, numBytes));
+  }
+
+  public UTF8String(MemoryBlock base) {
 this.base = base;
-this.offset = offset;
-this.numBytes = numBytes;
+if (base != null) {
+  if ((long) Integer.MAX_VALUE < base.size()) {
+throw new ArrayIndexOutOfBoundsException(
+  "MemoryBlock.size " + base.size() + " should be less than " + 
Integer.MAX_VALUE);
+  }
+  this.numBytes = (int) base.size();
--- End diff --

why not use `Ints.checkedCast`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175605220
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -119,15 +115,24 @@ public static UTF8String blankString(int length) {
 return fromBytes(spaces);
   }
 
-  protected UTF8String(Object base, long offset, int numBytes) {
+  protected UTF8String(byte[] bytes, long offset, int numBytes) {
+this(new ByteArrayMemoryBlock(bytes, offset, numBytes));
+  }
+
+  public UTF8String(MemoryBlock base) {
 this.base = base;
-this.offset = offset;
-this.numBytes = numBytes;
+if (base != null) {
--- End diff --

I think we can actually add ab assert to make sure `base` is not null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175605033
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -119,15 +115,24 @@ public static UTF8String blankString(int length) {
 return fromBytes(spaces);
   }
 
-  protected UTF8String(Object base, long offset, int numBytes) {
+  protected UTF8String(byte[] bytes, long offset, int numBytes) {
+this(new ByteArrayMemoryBlock(bytes, offset, numBytes));
+  }
+
+  public UTF8String(MemoryBlock base) {
 this.base = base;
-this.offset = offset;
-this.numBytes = numBytes;
+if (base != null) {
+  if ((long) Integer.MAX_VALUE < base.size()) {
--- End diff --

do we need to do the cast? I think comparing int and long, java would cast 
the int to long automatically.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175604820
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
 ---
@@ -36,22 +42,34 @@ public MemoryBlock allocate(long size) throws 
OutOfMemoryError {
 
   @Override
   public void free(MemoryBlock memory) {
-assert (memory.obj == null) :
-  "baseObject not null; are you trying to use the off-heap allocator 
to free on-heap memory?";
-assert (memory.pageNumber != 
MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+assert(memory instanceof OffHeapMemoryBlock) :
+  "UnsafeMemoryAllocator can only free OffHeapMemoryBlock.";
+if (memory == OffHeapMemoryBlock.NULL) return;
+assert (memory.getPageNumber() != 
MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
   "page has already been freed";
-assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
-|| (memory.pageNumber == 
MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
+|| (memory.getPageNumber() == 
MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
   "TMM-allocated pages must be freed via TMM.freePage(), not directly 
in allocator free()";
 
 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
   memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
 }
+
 Platform.freeMemory(memory.offset);
+
 // As an additional layer of defense against use-after-free bugs, we 
mutate the
 // MemoryBlock to reset its pointer.
-memory.offset = 0;
+memory.resetObjAndOffset();
 // Mark the page as freed (so we can detect double-frees).
-memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
+memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
+  }
+
+  public OffHeapMemoryBlock reallocate(OffHeapMemoryBlock block, long 
oldSize, long newSize) {
--- End diff --

no one is calling it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175604535
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +45,159 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected final long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
-  public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+if (offset < 0 || length < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Length " + length + " and offset " + offset + "must be 
non-negative");
+}
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  protected MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(
+"Instantiate MemoryBlock for type " + obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Just instantiate the same type of MemoryBlock with new offset and 
size. The data is not
+   * copied. If parameters are invalid, an exception is thrown
+   */
+  public abstract MemoryBlock subBlock(long offset, long size);
+
+  protected void checkSubBlockRange(long offset, long size) {
+if (this.offset + offset < 0 || size < 0) {
+  throw new ArrayIndexOutOfBoundsException(
+"Size " + size + " and offset " + (this.offset + offset) + " must 
be non-negative");
+}
+if (offset + size > length) {
+  throw new ArrayIndexOutOfBoundsException("The sum of size " + size + 
" and offset " +
+offset + " should not be larger than the length " + length + " in 
the MemoryBlock");
+}
+  }
+
+  /**
+   * getXXX/putXXX does not ensure guarantee behavior if the offset is 
invalid. e.g  cause illegal
+   * memory access, throw an exception, or etc.
+   */
+  public abstract int getInt(long offset);
+
+  public abstract void putInt(long offset, int value);
+
+  public abstract boolean getBoolean(long offset);
+
+  public abstract void putBoolean(long offset, boolean value);
+
+  public abstract byte getByte(long offset);
+
+  public abstract void putByte(long offset, byte value);
+
+  public ab

[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175604062
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 ---
@@ -94,12 +95,12 @@ public void free(MemoryBlock memory) {
 }
 
 // Mark the page as freed (so we can detect double-frees).
-memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
+memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
 
 // As an additional layer of defense against use-after-free bugs, we 
mutate the
 // MemoryBlock to null out its reference to the long[] array.
-long[] array = (long[]) memory.obj;
-memory.setObjAndOffset(null, 0);
+long[] array = ((OnHeapMemoryBlock)memory).getLongArray();
+memory.resetObjAndOffset();
 
 long alignedSize = ((size + 7) / 8) * 8;
 if (shouldPool(alignedSize)) {
--- End diff --

I think in the future we should cache `MemoryBlock` directly, so that we 
can have a unified pool for both on-heap and off-heap memory manager.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175592083
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,55 +51,81 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = Ints.checkedCast(base.size());
+assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
+int h1 = hashBytesByIntBlock(base, seed);
+return fmix(h1, lengthInBytes);
   }
 
   public static int hashUnsafeWords(Object base, long offset, int 
lengthInBytes, int seed) {
 // This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
 assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 
8 (word-aligned)";
-int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+int h1 = hashBytesByIntBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
--- End diff --

It's more consistent to call `hashUnsafeWordsBlock` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175581710
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java 
---
@@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int 
numBytes) {
   public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
 
   private static final boolean unaligned = Platform.unaligned();
+  /**
+   * MemoryBlock equality check for MemoryBlocks.
+   * @return true if the arrays are equal, false otherwise
+   */
+  public static boolean arrayEqualsBlock(
--- End diff --

no one is calling it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175580545
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -80,7 +76,7 @@ public void zeroOut() {
   public void set(int index, long value) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < length : "index (" + index + ") should < length (" + 
length + ")";
-Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
+memory.putLong(memory.getBaseOffset() + index * WIDTH, value);
--- End diff --

Thanks for your data! I think it proves that we should make the API clearer 
and hide these offsets stuff in the implementation. If we could fix this in 
this patch, I think we can encourage more people to use the new way to operate 
memory. What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-19 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r175548118
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -80,7 +76,7 @@ public void zeroOut() {
   public void set(int index, long value) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < length : "index (" + index + ") should < length (" + 
length + ")";
-Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
+memory.putLong(memory.getBaseOffset() + index * WIDTH, value);
--- End diff --

@cloud-fan sorry for my delay. I was busy with several stuffs.
I ran a benchmark program for `LongArray.zeroOut` with two version. One is 
current implementation. The other is future implementation like 
`memory.putLong(index * WIDTH, value)`. I got almost the same performance using 
two versions.
WDYT?

```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 
4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Platform MemoryAccess:   Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative


zeroOutCurrent1066 / 1071   1259.4  
 0.8   1.0X
zeroOutFuture 1061 / 1063   1265.6  
 0.8   1.0X
```

```
class MemoryBlockLoopAccessBenchmark extends SparkFunSuite {
  test("benchmark") {
val N = 128 * 1024 * 1024
val iters = 2
val M = 5
val benchmark = new Benchmark("Platform MemoryAccess", 1L * M * N * 
iters,
  minNumIters = 20)

val array = new Array[Long](N)
val mb = OnHeapMemoryBlock.fromArray(array)
val la = new LongArray(mb)

benchmark.addCase("zeroOutCurrent") { _: Int =>
  for (_ <- 0L until iters) {
var i = 0
while (i < M) {
  la.zeroOut()
  i += 1
}
  }
}

benchmark.addCase("zeroOutFuture") { _: Int =>
  for (_ <- 0L until iters) {
var i = 0
while (i < M) {
  la.zeroOutFuture()
  i += 1
}
  }
}

benchmark.run()
  }
}

public final class LongArray {
  ...
  public void zeroOut() {
long baseOffset = memory.getBaseOffset();
for (long off = baseOffset; off < baseOffset + length * WIDTH; off += 
WIDTH) {
  memory.putLong(off, 0);
}
  }

  public void zeroOutFuture() {
for (long off = 0; off < length * WIDTH; off += WIDTH) {
  memory.putLong2(off, 0);
}
  }
  ...
}

public final class OnHeapMemoryBlock extends MemoryBlock {
  ...
  @Override
  public final void putLong(long offset, long value) {
Platform.putLong(array, offset, value);
  }
  @Override
  public final void putLong2(long ofs, long value) {
Platform.putLong(array, ofs + offset, value);
  }
  ...
}
```





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-15 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174985078
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+
+public class OffHeapMemoryBlock extends MemoryBlock {
+  static public final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 
0);
+
+  public OffHeapMemoryBlock(long address, long size) {
+super(null, address, size);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+if (offset + size > this.offset + length) {
--- End diff --

Sure, I will move this check into the parent class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174361814
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java ---
@@ -80,7 +76,7 @@ public void zeroOut() {
   public void set(int index, long value) {
 assert index >= 0 : "index (" + index + ") should >= 0";
 assert index < length : "index (" + index + ") should < length (" + 
length + ")";
-Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
+memory.putLong(memory.getBaseOffset() + index * WIDTH, value);
--- End diff --

@cloud-fan good point. We will want use `memory.putLong(index * WIDTH, 
value).
I expect that JIT compiler could move loop invariants out of a loop or 
could map the sequence to a scaled index instruction `move targetreg, [basereg 
+ scalereg * constant + offsetreg]` well.

I will investigate what happens in the native code using the example that 
you pointed out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174334386
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,55 +50,81 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = (int)base.size();
--- End diff --

Since this requires `int`, I wil add cast check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174333828
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
 ---
@@ -38,12 +39,18 @@ public static int hashLong(long input) {
 return (int) ((input >>> 32) ^ input);
   }
 
-  public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes) {
+  public static int hashUnsafeBytesBlock(MemoryBlock mb) {
+long offset = mb.getBaseOffset();
+int lengthInBytes = (int)mb.size();
--- End diff --

Let us use `long`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174333675
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset - Platform.BYTE_ARRAY_OFFSET + size <= obj.length * 8L) :
--- End diff --

Oh, it's my mistake.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174295581
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
 ---
@@ -38,12 +39,18 @@ public static int hashLong(long input) {
 return (int) ((input >>> 32) ^ input);
   }
 
-  public static int hashUnsafeBytes(Object base, long offset, int 
lengthInBytes) {
+  public static int hashUnsafeBytesBlock(MemoryBlock mb) {
+long offset = mb.getBaseOffset();
+int lengthInBytes = (int)mb.size();
--- End diff --

or we just use long here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174300387
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java ---
@@ -49,55 +50,81 @@ public static int hashInt(int input, int seed) {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-return hashUnsafeWords(base, offset, lengthInBytes, seed);
+return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, 
offset, lengthInBytes), seed);
+  }
+
+  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
+// This is based on Guava's 
`Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+int lengthInBytes = (int)base.size();
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174300982
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+
+public class OffHeapMemoryBlock extends MemoryBlock {
+  static public final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 
0);
+
+  public OffHeapMemoryBlock(long address, long size) {
+super(null, address, size);
+  }
+
+  @Override
+  public MemoryBlock subBlock(long offset, long size) {
+if (offset + size > this.offset + length) {
--- End diff --

shall we put this into a protected method in the parent class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-03-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r174301203
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+super(obj, offset, size);
+this.array = obj;
+assert(offset - Platform.BYTE_ARRAY_OFFSET + size <= obj.length * 8L) :
--- End diff --

why `* 8`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >