This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 492d1d4  [FLINK-11724][core] Add copyToUnsafe, copyFromUnsafe and 
equalTo to MemorySegment.
492d1d4 is described below

commit 492d1d40100b1297281408bf0d83b6db5378b9cb
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Wed Feb 27 19:37:52 2019 +0800

    [FLINK-11724][core] Add copyToUnsafe, copyFromUnsafe and equalTo to 
MemorySegment.
    
    This closes #7847
---
 .../apache/flink/core/memory/MemorySegment.java    | 77 ++++++++++++++++++++++
 .../flink/core/memory/CrossSegmentTypeTest.java    | 10 +++
 .../flink/core/memory/MemorySegmentTestBase.java   | 35 ++++++++++
 3 files changed, 122 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index b95ceb9..39b6d9c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -1270,6 +1270,50 @@ public abstract class MemorySegment {
                }
        }
 
+       /**
+        * Bulk copy method. Copies {@code numBytes} bytes to target unsafe 
object and pointer.
+        * NOTE: This is a unsafe method, no check here, please be carefully.
+        *
+        * @param offset The position where the bytes are started to be read 
from in this memory segment.
+        * @param target The unsafe memory to copy the bytes to.
+        * @param targetPointer The position in the target unsafe memory to 
copy the chunk to.
+        * @param numBytes The number of bytes to copy.
+        *
+        * @throws IndexOutOfBoundsException If the source segment does not 
contain the given number
+        *           of bytes (starting from offset).
+        */
+       public final void copyToUnsafe(int offset, Object target, int 
targetPointer, int numBytes) {
+               final long thisPointer = this.address + offset;
+               if (thisPointer + numBytes > addressLimit) {
+                       throw new IndexOutOfBoundsException(
+                                       String.format("offset=%d, numBytes=%d, 
address=%d",
+                                                       offset, numBytes, 
this.address));
+               }
+               UNSAFE.copyMemory(this.heapMemory, thisPointer, target, 
targetPointer, numBytes);
+       }
+
+       /**
+        * Bulk copy method. Copies {@code numBytes} bytes from source unsafe 
object and pointer.
+        * NOTE: This is a unsafe method, no check here, please be carefully.
+        *
+        * @param offset The position where the bytes are started to be write 
in this memory segment.
+        * @param source The unsafe memory to copy the bytes from.
+        * @param sourcePointer The position in the source unsafe memory to 
copy the chunk from.
+        * @param numBytes The number of bytes to copy.
+        *
+        * @throws IndexOutOfBoundsException If this segment can not contain 
the given number
+        *           of bytes (starting from offset).
+        */
+       public final void copyFromUnsafe(int offset, Object source, int 
sourcePointer, int numBytes) {
+               final long thisPointer = this.address + offset;
+               if (thisPointer + numBytes > addressLimit) {
+                       throw new IndexOutOfBoundsException(
+                                       String.format("offset=%d, numBytes=%d, 
address=%d",
+                                                       offset, numBytes, 
this.address));
+               }
+               UNSAFE.copyMemory(source, sourcePointer, this.heapMemory, 
thisPointer, numBytes);
+       }
+
        // 
-------------------------------------------------------------------------
        //                      Comparisons & Swapping
        // 
-------------------------------------------------------------------------
@@ -1349,4 +1393,37 @@ public abstract class MemorySegment {
                                        String.format("offset1=%d, offset2=%d, 
len=%d, bufferSize=%d, address1=%d, address2=%d",
                                                        offset1, offset2, len, 
tempBuffer.length, this.address, seg2.address));
        }
+
+       /**
+        * Equals two memory segment regions.
+        *
+        * @param seg2 Segment to equal this segment with
+        * @param offset1 Offset of this segment to start equaling
+        * @param offset2 Offset of seg2 to start equaling
+        * @param length Length of the equaled memory region
+        *
+        * @return true if equal, false otherwise
+        */
+       public final boolean equalTo(MemorySegment seg2, int offset1, int 
offset2, int length) {
+               int i = 0;
+
+               // we assume unaligned accesses are supported.
+               // Compare 8 bytes at a time.
+               while (i <= length - 8) {
+                       if (getLong(offset1 + i) != seg2.getLong(offset2 + i)) {
+                               return false;
+                       }
+                       i += 8;
+               }
+
+               // cover the last (length % 8) elements.
+               while (i < length) {
+                       if (get(offset1 + i) != seg2.get(offset2 + i)) {
+                               return false;
+                       }
+                       i += 1;
+               }
+
+               return true;
+       }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
index ea144c7..ccff2ba 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
@@ -35,6 +35,8 @@ import static org.junit.Assert.fail;
  */
 public class CrossSegmentTypeTest {
 
+       private static final long BYTE_ARRAY_BASE_OFFSET = 
MemoryUtils.UNSAFE.arrayBaseOffset(byte[].class);
+
        private final int pageSize = 32 * 1024;
 
        // 
------------------------------------------------------------------------
@@ -187,6 +189,8 @@ public class CrossSegmentTypeTest {
 
                byte[] expected = new byte[pageSize];
                byte[] actual = new byte[pageSize];
+               byte[] unsafeCopy = new byte[pageSize];
+               MemorySegment unsafeCopySeg = 
MemorySegmentFactory.allocateUnpooledSegment(pageSize);
 
                // zero out the memory
                seg1.put(0, expected);
@@ -205,6 +209,12 @@ public class CrossSegmentTypeTest {
 
                        seg1.put(thisPos, bytes);
                        seg1.copyTo(thisPos, seg2, otherPos, numBytes);
+                       seg1.copyToUnsafe(thisPos, unsafeCopy, (int) (otherPos 
+ BYTE_ARRAY_BASE_OFFSET), numBytes);
+
+                       int otherPos2 = random.nextInt(pageSize - numBytes);
+                       unsafeCopySeg.copyFromUnsafe(otherPos2, unsafeCopy,
+                                       (int) (otherPos + 
BYTE_ARRAY_BASE_OFFSET), numBytes);
+                       assertTrue(unsafeCopySeg.equalTo(seg2, otherPos2, 
otherPos, numBytes));
                }
 
                seg2.get(0, actual);
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
index fb28948..0b8f1d0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java
@@ -304,6 +304,41 @@ public abstract class MemorySegmentTestBase {
        }
 
        @Test
+       public void testCopyUnsafeIndexOutOfBounds() {
+               byte[] bytes = new byte[pageSize];
+               MemorySegment segment = createSegment(pageSize);
+
+               try {
+                       segment.copyToUnsafe(1, bytes, 0, pageSize);
+                       fail("should fail with an IndexOutOfBoundsException");
+               }
+               catch (IndexOutOfBoundsException ignored) {}
+
+               try {
+                       segment.copyFromUnsafe(1, bytes, 0, pageSize);
+                       fail("should fail with an IndexOutOfBoundsException");
+               }
+               catch (IndexOutOfBoundsException ignored) {}
+       }
+
+       @Test
+       public void testEqualTo() {
+               MemorySegment seg1 = createSegment(pageSize);
+               MemorySegment seg2 = createSegment(pageSize);
+
+               int i = new Random().nextInt(pageSize - 8);
+
+               seg1.put(i, (byte) 10);
+               assertFalse(seg1.equalTo(seg2, i, i, 9));
+
+               seg1.put(i, (byte) 0);
+               assertTrue(seg1.equalTo(seg2, i, i, 9));
+
+               seg1.put(i + 8, (byte) 10);
+               assertFalse(seg1.equalTo(seg2, i, i, 9));
+       }
+
+       @Test
        public void testCharAccess() {
                final MemorySegment segment = createSegment(pageSize);
 

Reply via email to