[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19222 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179497501 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -183,15 +184,13 @@ protected void checkSubBlockRange(long offset, long size) { public static final void copyMemory( MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) { -assert(length <= (src.length - src.getBaseOffset()) && - length <= (dst.length - dst.getBaseOffset())); +assert(length <= src.length && length <= dst.length); Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset, --- End diff -- Oh, i see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179497409 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -142,7 +143,7 @@ protected void checkSubBlockRange(long offset, long size) { } if (offset + size > length) { throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + -offset + " should not be larger than the length " + length + " in the MemoryBlock"); +offset + " should be equal to or subset of the original MemoryBlock"); --- End diff -- I see, let me revert. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179485835 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -183,15 +184,13 @@ protected void checkSubBlockRange(long offset, long size) { public static final void copyMemory( MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) { -assert(length <= (src.length - src.getBaseOffset()) && - length <= (dst.length - dst.getBaseOffset())); +assert(length <= src.length && length <= dst.length); Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset, --- End diff -- so `srcOffset` is relative, which means we should make sure `src.getBaseOffset() + srcOffset + length < src.length` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179485268 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -142,7 +143,7 @@ protected void checkSubBlockRange(long offset, long size) { } if (offset + size > length) { throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + -offset + " should not be larger than the length " + length + " in the MemoryBlock"); +offset + " should be equal to or subset of the original MemoryBlock"); --- End diff -- the previous message looks more clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179402310 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,164 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new IllegalArgumentException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with the new absolute offset and size. The data --- End diff -- I may misread your comment. I thought that you wanted to say "absolute". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179397907 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,164 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new IllegalArgumentException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with the new absolute offset and size. The data --- End diff -- As the implementation of `subBlock` returns something like `new OffHeapMemoryBlock(this.offset + offset, size)`, doesn't it be relative offset to original offset? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179391228 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,164 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new IllegalArgumentException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with the new absolute offset and size. The data --- End diff -- First of all, this API does not allow users to *expand* a memory block due to `checkSubBlockRange()`. This check does not accept `offset < 0` or `offset + size > length`. I have to update this assert as `this.offset + offset + size > length`. Also, I will add *subset* in the comment, and add some tests in `MemoryBlockSuite` ``` public MemoryBlock subBlock(long offset, long size) { checkSubBlockRange(offset, size); return new OnHeapMemoryBlock(array, this.offset + offset, size); } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179385907 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,164 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new IllegalArgumentException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with the new absolute offset and size. The data --- End diff -- This seems a dangerous API to me, we should not allow users to "expand" a memory block, do we have a use case for it? I'd like to make the `offset` parameter an increment of the current offset, i.e. `this.subBlock(0, this.size)` returns the same block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179335313 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws IOException { final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { - UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + new UTF8String( +new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)) --- End diff -- Sure, I just say it looks strange. I know you didn't change the unit test's behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179335019 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws IOException { final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { - UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + new UTF8String( +new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)) --- End diff -- This change follows the original behavior for the backward compatibility. If we want to change the behavior, update test, or delete this, it would be good to address in another PR. Is it better to discuss this UT in another PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179333954 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws IOException { final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { - UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + new UTF8String( +new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)) --- End diff -- It depends on what we are testing at this unit test. From the comment in this test `// offset underflow is apparently supported?`, looks like this behavior is uncertain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179332522 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws IOException { final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { - UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + new UTF8String( +new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)) --- End diff -- What is your suggestion to make it non-strange? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179332281 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET); + } + + public ByteArrayMemoryBlock(long length) { +this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +checkSubBlockRange(offset, size); +return new ByteArrayMemoryBlock(array, this.offset + offset, size); + } + + public byte[] getByteArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the byte array. + */ + public static ByteArrayMemoryBlock fromArray(final byte[] array) { +return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length); + } + + @Override + public final int getInt(long offset) { +return Platform.getInt(array, this.offset + offset); + } + + @Override + public final void putInt(long offset, int value) { +Platform.putInt(array, this.offset + offset, value); + } + + @Override + public final boolean getBoolean(long offset) { +return Platform.getBoolean(array, this.offset + offset); + } + + @Override + public final void putBoolean(long offset, boolean value) { +Platform.putBoolean(array, this.offset + offset, value); + } + + @Override + public final byte getByte(long offset) { +return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)]; + } + + @Override + public final void putByte(long offset, byte value) { +array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value; --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179331924 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET); + } + + public ByteArrayMemoryBlock(long length) { +this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +checkSubBlockRange(offset, size); +return new ByteArrayMemoryBlock(array, this.offset + offset, size); + } + + public byte[] getByteArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the byte array. + */ + public static ByteArrayMemoryBlock fromArray(final byte[] array) { +return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length); + } + + @Override + public final int getInt(long offset) { +return Platform.getInt(array, this.offset + offset); + } + + @Override + public final void putInt(long offset, int value) { +Platform.putInt(array, this.offset + offset, value); + } + + @Override + public final boolean getBoolean(long offset) { +return Platform.getBoolean(array, this.offset + offset); + } + + @Override + public final void putBoolean(long offset, boolean value) { +Platform.putBoolean(array, this.offset + offset, value); + } + + @Override + public final byte getByte(long offset) { +return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)]; --- End diff -- Suggestion from @cloud-fan. The Java array access is faster than `Platform.getByte`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179321572 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws IOException { final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { - UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + new UTF8String( +new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)) --- End diff -- Looks a bit strange. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179321207 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java --- @@ -19,15 +19,21 @@ import org.apache.spark.unsafe.Platform; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import sun.nio.ch.DirectBuffer; --- End diff -- Seems unused imports? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179320913 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a long array on Java heap. + */ +public final class OnHeapMemoryBlock extends MemoryBlock { + + private final long[] array; + + public OnHeapMemoryBlock(long[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET); --- End diff -- ditto. maybe "the size of the given memory space"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179320302 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET); + } + + public ByteArrayMemoryBlock(long length) { +this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +checkSubBlockRange(offset, size); +return new ByteArrayMemoryBlock(array, this.offset + offset, size); + } + + public byte[] getByteArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the byte array. + */ + public static ByteArrayMemoryBlock fromArray(final byte[] array) { +return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length); + } + + @Override + public final int getInt(long offset) { +return Platform.getInt(array, this.offset + offset); + } + + @Override + public final void putInt(long offset, int value) { +Platform.putInt(array, this.offset + offset, value); + } + + @Override + public final boolean getBoolean(long offset) { +return Platform.getBoolean(array, this.offset + offset); + } + + @Override + public final void putBoolean(long offset, boolean value) { +Platform.putBoolean(array, this.offset + offset, value); + } + + @Override + public final byte getByte(long offset) { +return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)]; + } + + @Override + public final void putByte(long offset, byte value) { +array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value; --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179320267 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET); + } + + public ByteArrayMemoryBlock(long length) { +this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +checkSubBlockRange(offset, size); +return new ByteArrayMemoryBlock(array, this.offset + offset, size); + } + + public byte[] getByteArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the byte array. + */ + public static ByteArrayMemoryBlock fromArray(final byte[] array) { +return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length); + } + + @Override + public final int getInt(long offset) { +return Platform.getInt(array, this.offset + offset); + } + + @Override + public final void putInt(long offset, int value) { +Platform.putInt(array, this.offset + offset, value); + } + + @Override + public final boolean getBoolean(long offset) { +return Platform.getBoolean(array, this.offset + offset); + } + + @Override + public final void putBoolean(long offset, boolean value) { +Platform.putBoolean(array, this.offset + offset, value); + } + + @Override + public final byte getByte(long offset) { +return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)]; --- End diff -- Why don't use `Platform.getByte`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179320092 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a long array on Java heap. + */ +public final class OnHeapMemoryBlock extends MemoryBlock { --- End diff -- Can we put @cloud-fan's comment https://github.com/apache/spark/pull/19222/files#r172356146 into `OnHeapMemoryBlock`'s doc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179319738 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : --- End diff -- hmm, from here and the accessors below, looks like the given `offset` includes `Platform.BYTE_ARRAY_OFFSET`? What will happen if a given offset is less than that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179319478 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length + Platform.BYTE_ARRAY_OFFSET); --- End diff -- Said `obj.length + Platform.BYTE_ARRAY_OFFSET` as the array size might be a bit confusing in an error message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179318575 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java --- @@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) { public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; private static final boolean unaligned = Platform.unaligned(); + /** + * MemoryBlock equality check for MemoryBlocks. + * @return true if the arrays are equal, false otherwise + */ + public static boolean arrayEqualsBlock( --- End diff -- I think this only works for `ByteArrayMemoryBlock`, doesn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179315793 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,162 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); --- End diff -- The semantic of parameter offset is not totally clear here. From the check (`offset + size <= length`) in `checkSubBlockRange`, looks like it is relative offset from original offset. But an offset parameter here can be also explained as the new absolute offset. I think it is better to clearly state the semantics here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179315071 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,162 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + offset + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as + * JVM object header. The offset is 0-based and is expected as an logical offset in the memory + * block. + */ + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void pu
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179314982 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,162 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + offset + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as + * JVM object header. The offset is 0-based and is expected as an logical offset in the memory + * block. + */ + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void pu
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179314879 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,162 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + offset + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as + * JVM object header. The offset is 0-based and is expected as an logical offset in the memory + * block. + */ + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void pu
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179314745 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,162 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + offset + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as + * JVM object header. The offset is 0-based and is expected as an logical offset in the memory + * block. + */ + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void pu
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r179313290 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,162 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( --- End diff -- nit: seems not really array index out of bounds? Maybe an `IllegalArgumentException`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r178083854 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -250,6 +246,7 @@ public long getPrefix() { } } p &= ~mask; +System.out.println("P: "+Long.toHexString(p)+", obj="+base.getBaseObject()+", offset="+base.getBaseOffset()+", size="+base.size()+", numBytes="+numBytes); --- End diff -- Good catch, this is my mistake. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r178020801 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -250,6 +246,7 @@ public long getPrefix() { } } p &= ~mask; +System.out.println("P: "+Long.toHexString(p)+", obj="+base.getBaseObject()+", offset="+base.getBaseOffset()+", size="+base.size()+", numBytes="+numBytes); --- End diff -- mistake? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r178020710 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a long array on Java heap. + */ +public final class OnHeapMemoryBlock extends MemoryBlock { + + private final long[] array; + + public OnHeapMemoryBlock(long[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET); + } + + public OnHeapMemoryBlock(long size) { +this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +checkSubBlockRange(offset, size); +return new OnHeapMemoryBlock(array, this.offset + offset, size); + } + + public long[] getLongArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the long array. + */ + public static OnHeapMemoryBlock fromArray(final long[] array) { +return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + } + + public static OnHeapMemoryBlock fromArray(final long[] array, long size) { +return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); + } + + @Override + public final int getInt(long offset) { +return Platform.getInt(array, this.offset + offset); + } + + @Override + public final void putInt(long offset, int value) { +Platform.putInt(array, this.offset + offset, value); + } + + @Override + public final boolean getBoolean(long offset) { +return Platform.getBoolean(array, this.offset + offset); + } + + @Override + public final void putBoolean(long offset, boolean value) { +Platform.putBoolean(array, this.offset + offset, value); + } + + @Override + public final byte getByte(long offset) { +return Platform.getByte(array, this.offset + offset); + } + + @Override + public final void putByte(long offset, byte value) { +Platform.putByte(array, this.offset + offset, value); + } + + @Override + public final short getShort(long offset) { +return Platform.getShort(array, this.offset + offset); + } + + @Override + public final void putShort(long offset, short value) { +Platform.putShort(array, this.offset + offset, value); + } + + @Override + public final long getLong(long offset) { +return Platform.getLong(array, this.offset + offset); --- End diff -- ah, endianess matters here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177958142 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : --- End diff -- To add this assertion cause a new failure at [`UTF8StringSuite.writeToOutputStreamUnderflow()`](https://github.com/apache/spark/pull/19222/files#diff-321a62638d3ef7bbc9c35842967c868bR515). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177953620 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws IOException { final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { - UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + new UTF8String( +new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)) --- End diff -- I thought this is what you said [here](https://github.com/apache/spark/pull/19222#discussion_r176986304). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177694735 --- Diff: sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java --- @@ -377,9 +378,10 @@ final UTF8String getUTF8String(int rowId) { if (stringResult.isSet == 0) { return null; } else { -return UTF8String.fromAddress(null, - stringResult.buffer.memoryAddress() + stringResult.start, - stringResult.end - stringResult.start); +int size = stringResult.end - stringResult.start; +OffHeapMemoryBlock mb = new OffHeapMemoryBlock( + stringResult.buffer.memoryAddress() + stringResult.start, size); +return new UTF8String(mb); --- End diff -- we can follow the previous code style: ``` new UTF8String(new OffHeapMemoryBlock( stringResult.buffer.memoryAddress() + stringResult.start, stringResult.end - stringResult.start )) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177693101 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -27,6 +27,9 @@ import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.bitset.BitSetMethods; import org.apache.spark.unsafe.hash.Murmur3_x86_32; +import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock; +import org.apache.spark.unsafe.memory.OnHeapMemoryBlock; +import org.apache.spark.unsafe.memory.MemoryBlock; --- End diff -- we only need to import MemoryBlock --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177693045 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java --- @@ -37,6 +37,7 @@ import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.bitset.BitSetMethods; import org.apache.spark.unsafe.hash.Murmur3_x86_32; +import org.apache.spark.unsafe.memory.*; --- End diff -- we only need to import MemoryBlock --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177692709 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -348,10 +342,7 @@ public UnsafeSorterIterator getSortedIterator() { array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7, radixSortSupport.sortDescending(), radixSortSupport.sortSigned()); } else { -MemoryBlock unused = new MemoryBlock( - array.getBaseObject(), - array.getBaseOffset() + pos * 8L, - (array.size() - pos) * 8L); +MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L,(array.size() - pos) * 8L); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177692646 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java --- @@ -60,13 +60,8 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) { @Override public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) { -Platform.copyMemory( - src.getBaseObject(), - src.getBaseOffset() + srcPos * 8L, - dst.getBaseObject(), - dst.getBaseOffset() + dstPos * 8L, - length * 8L -); +MemoryBlock.copyMemory(src.memoryBlock(),srcPos * 8L, --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177692472 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java --- @@ -105,13 +104,7 @@ public void reset() { public void expandPointerArray(LongArray newArray) { assert(newArray.size() > array.size()); -Platform.copyMemory( - array.getBaseObject(), - array.getBaseOffset(), - newArray.getBaseObject(), - newArray.getBaseOffset(), - pos * 8L -); +MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(),pos * 8L); --- End diff -- space after `,` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177692581 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java --- @@ -180,10 +173,7 @@ public ShuffleSorterIterator getSortedIterator() { PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX, PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false); } else { - MemoryBlock unused = new MemoryBlock( -array.getBaseObject(), -array.getBaseOffset() + pos * 8L, -(array.size() - pos) * 8L); + MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L,(array.size() - pos) * 8L); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177692142 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -515,7 +518,8 @@ public void writeToOutputStreamUnderflow() throws IOException { final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { - UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + new UTF8String( +new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)) --- End diff -- I'm surprised this works... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177687204 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -257,12 +258,13 @@ public long getPrefix() { */ public byte[] getBytes() { // avoid copy if `base` is `byte[]` -if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[] - && ((byte[]) base).length == numBytes) { - return (byte[]) base; +long offset = base.getBaseOffset(); +if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock + && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) { --- End diff -- weird, do you know why? they should be same logically. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177686923 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -119,15 +118,16 @@ public static UTF8String blankString(int length) { return fromBytes(spaces); } - protected UTF8String(Object base, long offset, int numBytes) { + public UTF8String(MemoryBlock base) { this.base = base; -this.offset = offset; -this.numBytes = numBytes; +if (base != null) { + this.numBytes = Ints.checkedCast(base.size()); +} } // for serialization public UTF8String() { -this(null, 0, 0); +this(null); --- End diff -- if this is the only place to pass in a null MemoryBlock, we can probably do nothing here instead of calling `this(null)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177686329 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -77,7 +81,8 @@ */ public static UTF8String fromBytes(byte[] bytes) { if (bytes != null) { - return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length); + return new UTF8String( +MemoryBlock.allocateFromObject(bytes, BYTE_ARRAY_OFFSET, bytes.length)); --- End diff -- we can create `ByteArrayMemoryBlock` directly here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177686396 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -90,19 +95,13 @@ public static UTF8String fromBytes(byte[] bytes) { */ public static UTF8String fromBytes(byte[] bytes, int offset, int numBytes) { if (bytes != null) { - return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes); + return new UTF8String( +MemoryBlock.allocateFromObject(bytes, BYTE_ARRAY_OFFSET + offset, numBytes)); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177684841 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,162 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (offset < 0 || size < 0 || this.offset + offset < 0) { --- End diff -- `this.offset + offset` seems unnecessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177682999 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : --- End diff -- shall we require `offset >= Platform.BYTE_ARRAY_OFFSET`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177681282 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,49 +51,67 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); } - public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = Ints.checkedCast(base.size()); assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; -int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); +int h1 = hashBytesByIntBlock(base, seed); return fmix(h1, lengthInBytes); } - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; --- End diff -- nit: we don't need this, it's checked in `hashUnsafeWordsBlock` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177511234 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -257,12 +258,13 @@ public long getPrefix() { */ public byte[] getBytes() { // avoid copy if `base` is `byte[]` -if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[] - && ((byte[]) base).length == numBytes) { - return (byte[]) base; +long offset = base.getBaseOffset(); +if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock + && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) { --- End diff -- This change caused some test failures, for example in `ColumnarBatchSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177409456 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import org.apache.spark.unsafe.Platform; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteOrder; + +import static org.hamcrest.core.StringContains.containsString; + +public class MemoryBlockSuite { --- End diff -- Thank you for your suggestion. As you said, I used `junit` for testing Java classes instead of `scalatest` ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177404199 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java --- @@ -71,40 +74,47 @@ public static long hashLong(long input, long seed) { return fmix(hash); } - public long hashUnsafeWords(Object base, long offset, int length) { -return hashUnsafeWords(base, offset, length, seed); + public long hashUnsafeWordsBlock(MemoryBlock mb) { +return hashUnsafeWordsBlock(mb, seed); } - public static long hashUnsafeWords(Object base, long offset, int length, long seed) { -assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)"; -long hash = hashBytesByWords(base, offset, length, seed); + public static long hashUnsafeWordsBlock(MemoryBlock mb, long seed) { +assert (mb.size() % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)"; +long hash = hashBytesByWordsBlock(mb, seed); return fmix(hash); } public long hashUnsafeBytes(Object base, long offset, int length) { return hashUnsafeBytes(base, offset, length, seed); } - public static long hashUnsafeBytes(Object base, long offset, int length, long seed) { + public static long hashUnsafeBytesBlock(MemoryBlock mb, long seed) { +Object base = mb.getBaseObject(); --- End diff -- where do we use base? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177403002 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import org.apache.spark.unsafe.Platform; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteOrder; + +import static org.hamcrest.core.StringContains.containsString; + +public class MemoryBlockSuite { --- End diff -- We don't have to write tests in java to test java classes. It's OK as you already wrote it, please write tests in scala next time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177400664 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -257,12 +258,13 @@ public long getPrefix() { */ public byte[] getBytes() { // avoid copy if `base` is `byte[]` -if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[] - && ((byte[]) base).length == numBytes) { - return (byte[]) base; +long offset = base.getBaseOffset(); +if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock + && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) { --- End diff -- nit: `base.size == numBytes` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177306568 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -69,8 +64,9 @@ public long size() { * Fill this all with 0L. */ public void zeroOut() { +long baseOffset = memory.getBaseOffset(); for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) { --- End diff -- Good catch. These three unchanges seems to lead to failures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177296492 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a long array on Java heap. + */ +public final class OnHeapMemoryBlock extends MemoryBlock { + + private final long[] array; + + public OnHeapMemoryBlock(long[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET); + } + + public OnHeapMemoryBlock(long size) { +this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +checkSubBlockRange(offset, size); +return new OnHeapMemoryBlock(array, this.offset + offset, size); + } + + public long[] getLongArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the long array. + */ + public static OnHeapMemoryBlock fromArray(final long[] array) { +return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + } + + public static OnHeapMemoryBlock fromArray(final long[] array, long size) { +return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); + } + + @Override + public final int getInt(long offset) { +return Platform.getInt(array, this.offset + offset); + } + + @Override + public final void putInt(long offset, int value) { +Platform.putInt(array, this.offset + offset, value); + } + + @Override + public final boolean getBoolean(long offset) { +return Platform.getBoolean(array, this.offset + offset); + } + + @Override + public final void putBoolean(long offset, boolean value) { +Platform.putBoolean(array, this.offset + offset, value); + } + + @Override + public final byte getByte(long offset) { +return Platform.getByte(array, this.offset + offset); + } + + @Override + public final void putByte(long offset, byte value) { +Platform.putByte(array, this.offset + offset, value); + } + + @Override + public final short getShort(long offset) { +return Platform.getShort(array, this.offset + offset); + } + + @Override + public final void putShort(long offset, short value) { +Platform.putShort(array, this.offset + offset, value); + } + + @Override + public final long getLong(long offset) { +return Platform.getLong(array, this.offset + offset); --- End diff -- shall we also apply https://github.com/apache/spark/pull/19222/files#diff-b9576c68b154d5e554671ffa84bfa74eR80 here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177296342 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,161 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as + * JVM object header. Thus, the offset is expected as an logical offset in the memory block. --- End diff -- also mention that the offset is 0-based. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org F
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177296162 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,161 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { --- End diff -- the check should be `offset >= 0`, the sub-block must be real "sub". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177295079 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -89,6 +85,6 @@ public void set(int index, long value) { public long get(int index) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -return Platform.getLong(baseObj, baseOffset + index * WIDTH); +return memory.getLong(memory.getBaseOffset() + index * WIDTH); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177295038 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -80,7 +76,7 @@ public void zeroOut() { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -Platform.putLong(baseObj, baseOffset + index * WIDTH, value); --- End diff -- update it to use 0-based offset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177294950 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -69,8 +64,9 @@ public long size() { * Fill this all with 0L. */ public void zeroOut() { +long baseOffset = memory.getBaseOffset(); for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) { --- End diff -- the `off` should starts with 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177217305 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,159 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + */ + public abstract int getInt(long offset); --- End diff -- @cloud-fan After prototyping, I succeeded to make `UTF8String` right in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r176992334 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,49 +51,70 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); } - public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = Ints.checkedCast(base.size()); assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; -int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); +int h1 = hashBytesByIntBlock(base, seed); return fmix(h1, lengthInBytes); } - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) { // This is not compatible with original and another implementations. // But remain it for backward compatibility for the components existing before 2.3. +long offset = base.getBaseOffset(); +long lengthInBytes = base.size(); --- End diff -- Sure, I will do them --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r176992008 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,159 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + */ + public abstract int getInt(long offset); --- End diff -- If we work for `0-based offset`, I think that we may need to update additional many files. This is because many files include hard-coded offset such as `Platform.BYTE_ARRAY_OFFSET`. In particular, the generated code uses the hard-coded offset. I think that it seems to very hard and buggy work to make it right with intro
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r176985708 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,159 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + */ + public abstract int getInt(long offset); --- End diff -- We should clearly document what's the expectation of the `offset` parameter, i.e. an index based on `this.getBaseOffset`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r176985312 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,49 +51,70 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); } - public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = Ints.checkedCast(base.size()); assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; -int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); +int h1 = hashBytesByIntBlock(base, seed); return fmix(h1, lengthInBytes); } - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) { // This is not compatible with original and another implementations. // But remain it for backward compatibility for the components existing before 2.3. +long offset = base.getBaseOffset(); +long lengthInBytes = base.size(); --- End diff -- shall we cast to int here instead of at the end? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r176985334 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,49 +51,70 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); } - public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = Ints.checkedCast(base.size()); assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; -int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); +int h1 = hashBytesByIntBlock(base, seed); return fmix(h1, lengthInBytes); } - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { + public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) { // This is not compatible with original and another implementations. // But remain it for backward compatibility for the components existing before 2.3. +long offset = base.getBaseOffset(); +long lengthInBytes = base.size(); assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; -int lengthAligned = lengthInBytes - lengthInBytes % 4; -int h1 = hashBytesByInt(base, offset, lengthAligned, seed); -for (int i = lengthAligned; i < lengthInBytes; i++) { - int halfWord = Platform.getByte(base, offset + i); +long lengthAligned = lengthInBytes - lengthInBytes % 4; +int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed); +for (long i = lengthAligned; i < lengthInBytes; i++) { + int halfWord = base.getByte(offset + i); int k1 = mixK1(halfWord); h1 = mixH1(h1, k1); } -return fmix(h1, lengthInBytes); +return fmix(h1, Ints.checkedCast(lengthInBytes)); + } + + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { +return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); } public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) { +return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) { // This is compatible with original and another implementations. // Use this method for new components after Spark 2.3. -assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; -int lengthAligned = lengthInBytes - lengthInBytes % 4; -int h1 = hashBytesByInt(base, offset, lengthAligned, seed); +long offset = base.getBaseOffset(); +long lengthInBytes = base.size(); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r176986033 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,159 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + */ + public abstract int getInt(long offset); --- End diff -- BTW do you have a list of followup JIRAs? If you are the only one working on it, I'm fine to change the semantics of the offset later. Otherwise I'd like to make it right at the first version, i.e. 0-based offset. --- - To unsubscribe, e-mail: reviews-
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r176986304 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -119,15 +116,20 @@ public static UTF8String blankString(int length) { return fromBytes(spaces); } - protected UTF8String(Object base, long offset, int numBytes) { + protected UTF8String(byte[] bytes, long offset, int numBytes) { --- End diff -- why do we need this? I think the caller side should call `MemoryBlock.allocateFromObject` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175885446 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,55 +51,81 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = Ints.checkedCast(base.size()); +assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; +int h1 = hashBytesByIntBlock(base, seed); +return fmix(h1, lengthInBytes); } public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; -int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); +int h1 = hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); return fmix(h1, lengthInBytes); --- End diff -- Thanks for good catch. I realized that current UTs do not test `hashUnsafeWords` well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175880756 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,55 +51,81 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = Ints.checkedCast(base.size()); +assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; +int h1 = hashBytesByIntBlock(base, seed); +return fmix(h1, lengthInBytes); } public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; -int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); +int h1 = hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); return fmix(h1, lengthInBytes); --- End diff -- we don't need this line now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175879336 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -119,15 +115,24 @@ public static UTF8String blankString(int length) { return fromBytes(spaces); } - protected UTF8String(Object base, long offset, int numBytes) { + protected UTF8String(byte[] bytes, long offset, int numBytes) { +this(new ByteArrayMemoryBlock(bytes, offset, numBytes)); + } + + public UTF8String(MemoryBlock base) { this.base = base; -this.offset = offset; -this.numBytes = numBytes; +if (base != null) { --- End diff -- ah i see --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175868832 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -80,7 +76,7 @@ public void zeroOut() { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -Platform.putLong(baseObj, baseOffset + index * WIDTH, value); +memory.putLong(memory.getBaseOffset() + index * WIDTH, value); --- End diff -- the problem is that, the semantic of the offset argument is pretty important to the `MemoryBlock` component. If we don't do it right at the first version, we bring a bad history to `MemoryBlock`, which may confuse other people. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175865685 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -119,15 +115,24 @@ public static UTF8String blankString(int length) { return fromBytes(spaces); } - protected UTF8String(Object base, long offset, int numBytes) { + protected UTF8String(byte[] bytes, long offset, int numBytes) { +this(new ByteArrayMemoryBlock(bytes, offset, numBytes)); + } + + public UTF8String(MemoryBlock base) { this.base = base; -this.offset = offset; -this.numBytes = numBytes; +if (base != null) { --- End diff -- `UTF8String(null)` is actually called for serialization. Thus, we cannot replace `if` with `assert`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175864633 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,159 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected final long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + */ + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void putBoolean(long offset, boolean value); + + public abstract byte getByte(long offset); + + public abstract void putByte(long offset, byte value); + + public abstra
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175862823 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java --- @@ -94,12 +95,12 @@ public void free(MemoryBlock memory) { } // Mark the page as freed (so we can detect double-frees). -memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER; +memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER); // As an additional layer of defense against use-after-free bugs, we mutate the // MemoryBlock to null out its reference to the long[] array. -long[] array = (long[]) memory.obj; -memory.setObjAndOffset(null, 0); +long[] array = ((OnHeapMemoryBlock)memory).getLongArray(); +memory.resetObjAndOffset(); long alignedSize = ((size + 7) / 8) * 8; if (shouldPool(alignedSize)) { --- End diff -- Yeah, in that case, we will move `allocate` and `free` methods into `MemoryAllocator` class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175859167 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java --- @@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) { public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; private static final boolean unaligned = Platform.unaligned(); + /** + * MemoryBlock equality check for MemoryBlocks. + * @return true if the arrays are equal, false otherwise + */ + public static boolean arrayEqualsBlock( --- End diff -- This method is called from `UTF8String` class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175689353 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java --- @@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) { public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; private static final boolean unaligned = Platform.unaligned(); + /** + * MemoryBlock equality check for MemoryBlocks. + * @return true if the arrays are equal, false otherwise + */ + public static boolean arrayEqualsBlock( --- End diff -- Thank you for your comments. I will address them on Wed or Thu. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175678610 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java --- @@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) { public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; private static final boolean unaligned = Platform.unaligned(); + /** + * MemoryBlock equality check for MemoryBlocks. + * @return true if the arrays are equal, false otherwise + */ + public static boolean arrayEqualsBlock( --- End diff -- Thank you for your review. I will address them on Wed or Thu. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175678287 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -80,7 +76,7 @@ public void zeroOut() { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -Platform.putLong(baseObj, baseOffset + index * WIDTH, value); +memory.putLong(memory.getBaseOffset() + index * WIDTH, value); --- End diff -- Yeah, we can encourage more people to use the new memory management and its API. On the other hand, it would be good to take two-step approach: 1. Use new MemoryBlock with the current explicit offset usage 2. Hide offset stuff in the implementation. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175605407 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -119,15 +115,24 @@ public static UTF8String blankString(int length) { return fromBytes(spaces); } - protected UTF8String(Object base, long offset, int numBytes) { + protected UTF8String(byte[] bytes, long offset, int numBytes) { +this(new ByteArrayMemoryBlock(bytes, offset, numBytes)); + } + + public UTF8String(MemoryBlock base) { this.base = base; -this.offset = offset; -this.numBytes = numBytes; +if (base != null) { + if ((long) Integer.MAX_VALUE < base.size()) { +throw new ArrayIndexOutOfBoundsException( + "MemoryBlock.size " + base.size() + " should be less than " + Integer.MAX_VALUE); + } + this.numBytes = (int) base.size(); --- End diff -- why not use `Ints.checkedCast`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175605220 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -119,15 +115,24 @@ public static UTF8String blankString(int length) { return fromBytes(spaces); } - protected UTF8String(Object base, long offset, int numBytes) { + protected UTF8String(byte[] bytes, long offset, int numBytes) { +this(new ByteArrayMemoryBlock(bytes, offset, numBytes)); + } + + public UTF8String(MemoryBlock base) { this.base = base; -this.offset = offset; -this.numBytes = numBytes; +if (base != null) { --- End diff -- I think we can actually add ab assert to make sure `base` is not null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175605033 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -119,15 +115,24 @@ public static UTF8String blankString(int length) { return fromBytes(spaces); } - protected UTF8String(Object base, long offset, int numBytes) { + protected UTF8String(byte[] bytes, long offset, int numBytes) { +this(new ByteArrayMemoryBlock(bytes, offset, numBytes)); + } + + public UTF8String(MemoryBlock base) { this.base = base; -this.offset = offset; -this.numBytes = numBytes; +if (base != null) { + if ((long) Integer.MAX_VALUE < base.size()) { --- End diff -- do we need to do the cast? I think comparing int and long, java would cast the int to long automatically. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175604820 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java --- @@ -36,22 +42,34 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError { @Override public void free(MemoryBlock memory) { -assert (memory.obj == null) : - "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?"; -assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : +assert(memory instanceof OffHeapMemoryBlock) : + "UnsafeMemoryAllocator can only free OffHeapMemoryBlock."; +if (memory == OffHeapMemoryBlock.NULL) return; +assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : "page has already been freed"; -assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER) -|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : +assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER) +|| (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()"; if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); } + Platform.freeMemory(memory.offset); + // As an additional layer of defense against use-after-free bugs, we mutate the // MemoryBlock to reset its pointer. -memory.offset = 0; +memory.resetObjAndOffset(); // Mark the page as freed (so we can detect double-frees). -memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER; +memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER); + } + + public OffHeapMemoryBlock reallocate(OffHeapMemoryBlock block, long oldSize, long newSize) { --- End diff -- no one is calling it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175604535 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,159 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected final long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + */ + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void putBoolean(long offset, boolean value); + + public abstract byte getByte(long offset); + + public abstract void putByte(long offset, byte value); + + public ab
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175604062 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java --- @@ -94,12 +95,12 @@ public void free(MemoryBlock memory) { } // Mark the page as freed (so we can detect double-frees). -memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER; +memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER); // As an additional layer of defense against use-after-free bugs, we mutate the // MemoryBlock to null out its reference to the long[] array. -long[] array = (long[]) memory.obj; -memory.setObjAndOffset(null, 0); +long[] array = ((OnHeapMemoryBlock)memory).getLongArray(); +memory.resetObjAndOffset(); long alignedSize = ((size + 7) / 8) * 8; if (shouldPool(alignedSize)) { --- End diff -- I think in the future we should cache `MemoryBlock` directly, so that we can have a unified pool for both on-heap and off-heap memory manager. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175592083 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,55 +51,81 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = Ints.checkedCast(base.size()); +assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; +int h1 = hashBytesByIntBlock(base, seed); +return fmix(h1, lengthInBytes); } public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; -int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); +int h1 = hashBytesByIntBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); --- End diff -- It's more consistent to call `hashUnsafeWordsBlock` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175581710 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java --- @@ -48,6 +49,16 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) { public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; private static final boolean unaligned = Platform.unaligned(); + /** + * MemoryBlock equality check for MemoryBlocks. + * @return true if the arrays are equal, false otherwise + */ + public static boolean arrayEqualsBlock( --- End diff -- no one is calling it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175580545 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -80,7 +76,7 @@ public void zeroOut() { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -Platform.putLong(baseObj, baseOffset + index * WIDTH, value); +memory.putLong(memory.getBaseOffset() + index * WIDTH, value); --- End diff -- Thanks for your data! I think it proves that we should make the API clearer and hide these offsets stuff in the implementation. If we could fix this in this patch, I think we can encourage more people to use the new way to operate memory. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r175548118 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -80,7 +76,7 @@ public void zeroOut() { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -Platform.putLong(baseObj, baseOffset + index * WIDTH, value); +memory.putLong(memory.getBaseOffset() + index * WIDTH, value); --- End diff -- @cloud-fan sorry for my delay. I was busy with several stuffs. I ran a benchmark program for `LongArray.zeroOut` with two version. One is current implementation. The other is future implementation like `memory.putLong(index * WIDTH, value)`. I got almost the same performance using two versions. WDYT? ``` OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz Platform MemoryAccess: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative zeroOutCurrent1066 / 1071 1259.4 0.8 1.0X zeroOutFuture 1061 / 1063 1265.6 0.8 1.0X ``` ``` class MemoryBlockLoopAccessBenchmark extends SparkFunSuite { test("benchmark") { val N = 128 * 1024 * 1024 val iters = 2 val M = 5 val benchmark = new Benchmark("Platform MemoryAccess", 1L * M * N * iters, minNumIters = 20) val array = new Array[Long](N) val mb = OnHeapMemoryBlock.fromArray(array) val la = new LongArray(mb) benchmark.addCase("zeroOutCurrent") { _: Int => for (_ <- 0L until iters) { var i = 0 while (i < M) { la.zeroOut() i += 1 } } } benchmark.addCase("zeroOutFuture") { _: Int => for (_ <- 0L until iters) { var i = 0 while (i < M) { la.zeroOutFuture() i += 1 } } } benchmark.run() } } public final class LongArray { ... public void zeroOut() { long baseOffset = memory.getBaseOffset(); for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) { memory.putLong(off, 0); } } public void zeroOutFuture() { for (long off = 0; off < length * WIDTH; off += WIDTH) { memory.putLong2(off, 0); } } ... } public final class OnHeapMemoryBlock extends MemoryBlock { ... @Override public final void putLong(long offset, long value) { Platform.putLong(array, offset, value); } @Override public final void putLong2(long ofs, long value) { Platform.putLong(array, ofs + offset, value); } ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174985078 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import org.apache.spark.unsafe.Platform; + +public class OffHeapMemoryBlock extends MemoryBlock { + static public final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0); + + public OffHeapMemoryBlock(long address, long size) { +super(null, address, size); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +if (offset + size > this.offset + length) { --- End diff -- Sure, I will move this check into the parent class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174361814 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -80,7 +76,7 @@ public void zeroOut() { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -Platform.putLong(baseObj, baseOffset + index * WIDTH, value); +memory.putLong(memory.getBaseOffset() + index * WIDTH, value); --- End diff -- @cloud-fan good point. We will want use `memory.putLong(index * WIDTH, value). I expect that JIT compiler could move loop invariants out of a loop or could map the sequence to a scaled index instruction `move targetreg, [basereg + scalereg * constant + offsetreg]` well. I will investigate what happens in the native code using the example that you pointed out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174334386 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,55 +50,81 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = (int)base.size(); --- End diff -- Since this requires `int`, I wil add cast check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174333828 --- Diff: common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java --- @@ -38,12 +39,18 @@ public static int hashLong(long input) { return (int) ((input >>> 32) ^ input); } - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { + public static int hashUnsafeBytesBlock(MemoryBlock mb) { +long offset = mb.getBaseOffset(); +int lengthInBytes = (int)mb.size(); --- End diff -- Let us use `long`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174333675 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset - Platform.BYTE_ARRAY_OFFSET + size <= obj.length * 8L) : --- End diff -- Oh, it's my mistake. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174295581 --- Diff: common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java --- @@ -38,12 +39,18 @@ public static int hashLong(long input) { return (int) ((input >>> 32) ^ input); } - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { + public static int hashUnsafeBytesBlock(MemoryBlock mb) { +long offset = mb.getBaseOffset(); +int lengthInBytes = (int)mb.size(); --- End diff -- or we just use long here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174300387 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java --- @@ -49,55 +50,81 @@ public static int hashInt(int input, int seed) { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { -return hashUnsafeWords(base, offset, lengthInBytes, seed); +return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + } + + public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { +// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. +int lengthInBytes = (int)base.size(); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174300982 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import org.apache.spark.unsafe.Platform; + +public class OffHeapMemoryBlock extends MemoryBlock { + static public final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0); + + public OffHeapMemoryBlock(long address, long size) { +super(null, address, size); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +if (offset + size > this.offset + length) { --- End diff -- shall we put this into a protected method in the parent class? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r174301203 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset - Platform.BYTE_ARRAY_OFFSET + size <= obj.length * 8L) : --- End diff -- why `* 8`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org