[GitHub] [flink] xintongsong commented on a change in pull request #9747: [FLINK-13985] Use unsafe memory for managed memory

2019-10-16 Thread GitBox
xintongsong commented on a change in pull request #9747: [FLINK-13985] Use 
unsafe memory for managed memory
URL: https://github.com/apache/flink/pull/9747#discussion_r335355116
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
 ##
 @@ -57,4 +64,79 @@
 
/** Should not be instantiated. */
private MemoryUtils() {}
+
+   private static Constructor 
getDirectBufferPrivateConstructor() {
+   //noinspection OverlyBroadCatchBlock
+   try {
+   Constructor constructor =
+   
ByteBuffer.allocateDirect(1).getClass().getDeclaredConstructor(long.class, 
int.class);
+   constructor.setAccessible(true);
+   return constructor;
+   } catch (NoSuchMethodException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available.");
+   } catch (SecurityException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available, " +
+   "permission denied by security 
manager");
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(
+   t,
+   "Unclassified error while trying to access 
private constructor " +
+   "java.nio.DirectByteBuffer.(long, 
int).");
+   }
+   throw new RuntimeException("unexpected to avoid returning 
null");
+   }
+
+   /**
+* Allocates unsafe native memory.
+*
+* @param size size of the unsafe memory to allocate.
+* @return address of the allocated unsafe memory
+*/
+   static long allocateUnsafe(long size) {
+   return UNSAFE.allocateMemory(Math.max(1L, size));
+   }
+
+   /**
+* Creates a cleaner to release the unsafe memory by VM GC.
+*
+* When memory owner becomes phantom reachable,
+* GC will release the underlying unsafe memory if not released yet.
+*
+* @param owner memory owner which phantom reaching is to monitor by GC 
and release the unsafe memory
+* @param address address of the unsafe memory to release
+* @return action to run to release the unsafe memory manually
+*/
+   static Runnable createMemoryGcCleaner(@Nullable Object owner, long 
address) {
+   return createGcCleaner(owner, () -> releaseUnsafe(address));
+   }
+
+   @SuppressWarnings("UseOfSunClasses")
+   private static Runnable createGcCleaner(@Nullable Object owner, 
Runnable toClean) {
+   return owner == null ? toClean : sun.misc.Cleaner.create(owner, 
toClean)::clean;
 
 Review comment:
   My mistake, it should not cause any duplication calls to 
`Unsafe#freeMemory`, even if we remove the wrap of `Cleaner`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9747: [FLINK-13985] Use unsafe memory for managed memory

2019-10-15 Thread GitBox
xintongsong commented on a change in pull request #9747: [FLINK-13985] Use 
unsafe memory for managed memory
URL: https://github.com/apache/flink/pull/9747#discussion_r335273631
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
 ##
 @@ -57,4 +64,79 @@
 
/** Should not be instantiated. */
private MemoryUtils() {}
+
+   private static Constructor 
getDirectBufferPrivateConstructor() {
+   //noinspection OverlyBroadCatchBlock
+   try {
+   Constructor constructor =
+   
ByteBuffer.allocateDirect(1).getClass().getDeclaredConstructor(long.class, 
int.class);
+   constructor.setAccessible(true);
+   return constructor;
+   } catch (NoSuchMethodException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available.");
+   } catch (SecurityException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available, " +
+   "permission denied by security 
manager");
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(
+   t,
+   "Unclassified error while trying to access 
private constructor " +
+   "java.nio.DirectByteBuffer.(long, 
int).");
+   }
+   throw new RuntimeException("unexpected to avoid returning 
null");
+   }
+
+   /**
+* Allocates unsafe native memory.
+*
+* @param size size of the unsafe memory to allocate.
+* @return address of the allocated unsafe memory
+*/
+   static long allocateUnsafe(long size) {
+   return UNSAFE.allocateMemory(Math.max(1L, size));
+   }
+
+   /**
+* Creates a cleaner to release the unsafe memory by VM GC.
+*
+* When memory owner becomes phantom reachable,
+* GC will release the underlying unsafe memory if not released yet.
+*
+* @param owner memory owner which phantom reaching is to monitor by GC 
and release the unsafe memory
+* @param address address of the unsafe memory to release
+* @return action to run to release the unsafe memory manually
+*/
+   static Runnable createMemoryGcCleaner(@Nullable Object owner, long 
address) {
+   return createGcCleaner(owner, () -> releaseUnsafe(address));
+   }
+
+   @SuppressWarnings("UseOfSunClasses")
+   private static Runnable createGcCleaner(@Nullable Object owner, 
Runnable toClean) {
+   return owner == null ? toClean : sun.misc.Cleaner.create(owner, 
toClean)::clean;
 
 Review comment:
   Agree with removing null case.
   
   I get the idea that we use `Cleaner#clean` to guarantee that 
`Unsafe#freeMemory` will be called only once. And I think it's an elegant 
design. My point is we should add test cases to protect this. E.g., first call 
`HybridMemorySegment#free` then explicitly trigger a GC, and verify that there 
should be no failures due to multiple calls of `Unsafe#freeMemory`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9747: [FLINK-13985] Use unsafe memory for managed memory

2019-10-15 Thread GitBox
xintongsong commented on a change in pull request #9747: [FLINK-13985] Use 
unsafe memory for managed memory
URL: https://github.com/apache/flink/pull/9747#discussion_r335273631
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
 ##
 @@ -57,4 +64,79 @@
 
/** Should not be instantiated. */
private MemoryUtils() {}
+
+   private static Constructor 
getDirectBufferPrivateConstructor() {
+   //noinspection OverlyBroadCatchBlock
+   try {
+   Constructor constructor =
+   
ByteBuffer.allocateDirect(1).getClass().getDeclaredConstructor(long.class, 
int.class);
+   constructor.setAccessible(true);
+   return constructor;
+   } catch (NoSuchMethodException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available.");
+   } catch (SecurityException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available, " +
+   "permission denied by security 
manager");
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(
+   t,
+   "Unclassified error while trying to access 
private constructor " +
+   "java.nio.DirectByteBuffer.(long, 
int).");
+   }
+   throw new RuntimeException("unexpected to avoid returning 
null");
+   }
+
+   /**
+* Allocates unsafe native memory.
+*
+* @param size size of the unsafe memory to allocate.
+* @return address of the allocated unsafe memory
+*/
+   static long allocateUnsafe(long size) {
+   return UNSAFE.allocateMemory(Math.max(1L, size));
+   }
+
+   /**
+* Creates a cleaner to release the unsafe memory by VM GC.
+*
+* When memory owner becomes phantom reachable,
+* GC will release the underlying unsafe memory if not released yet.
+*
+* @param owner memory owner which phantom reaching is to monitor by GC 
and release the unsafe memory
+* @param address address of the unsafe memory to release
+* @return action to run to release the unsafe memory manually
+*/
+   static Runnable createMemoryGcCleaner(@Nullable Object owner, long 
address) {
+   return createGcCleaner(owner, () -> releaseUnsafe(address));
+   }
+
+   @SuppressWarnings("UseOfSunClasses")
+   private static Runnable createGcCleaner(@Nullable Object owner, 
Runnable toClean) {
+   return owner == null ? toClean : sun.misc.Cleaner.create(owner, 
toClean)::clean;
 
 Review comment:
   Agree with removing null case.
   
   I get the idea that we use `Cleaner#clean` to guarantee that 
`Unsafe#freeMemory` will be called only once. And I think it's a elegant 
design. My point is we should add test cases to protect this. E.g., first call 
`HybridMemorySegment#free` then explicitly trigger a GC, and verify that there 
should be no failures due to multiple calls of `Unsafe#freeMemory`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9747: [FLINK-13985] Use unsafe memory for managed memory

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9747: [FLINK-13985] Use 
unsafe memory for managed memory
URL: https://github.com/apache/flink/pull/9747#discussion_r328422193
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 ##
 @@ -86,16 +86,16 @@ public static MemorySegment 
allocateUnpooledOffHeapMemory(int size) {
}
 
/**
-* Allocates some unpooled off-heap memory and creates a new memory 
segment that
-* represents that memory.
+* Allocates an unpooled off-heap memory and creates a new memory 
segment to represent that memory.
 *
 * @param size The size of the off-heap memory segment to allocate.
 * @param owner The owner to associate with the off-heap memory segment.
 * @return A new memory segment, backed by unpooled off-heap memory.
 */
public static MemorySegment allocateUnpooledOffHeapMemory(int size, 
Object owner) {
-   ByteBuffer memory = ByteBuffer.allocateDirect(size);
-   return new HybridMemorySegment(memory, owner);
+   long address = MemoryUtils.allocateUnsafe(size);
+   ByteBuffer offHeapBuffer = 
MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
+   return new HybridMemorySegment(offHeapBuffer, owner, 
MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address));
 
 Review comment:
   We intend to use unsafe memory for memory manager (off-heap managed memory) 
only. This interface is also used by other components (`FileChannelBoundedData` 
and `NetworkBufferPool`), where direct memory limited by 
`-XX:MaxDirectMemorySize` is expected.
   
   I think we should not change this interface, but add another 
`allocateUnpooledUnsafeOffHeapMemory` and use it only in memory manager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9747: [FLINK-13985] Use unsafe memory for managed memory

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9747: [FLINK-13985] Use 
unsafe memory for managed memory
URL: https://github.com/apache/flink/pull/9747#discussion_r328425366
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java
 ##
 @@ -57,4 +64,79 @@
 
/** Should not be instantiated. */
private MemoryUtils() {}
+
+   private static Constructor 
getDirectBufferPrivateConstructor() {
+   //noinspection OverlyBroadCatchBlock
+   try {
+   Constructor constructor =
+   
ByteBuffer.allocateDirect(1).getClass().getDeclaredConstructor(long.class, 
int.class);
+   constructor.setAccessible(true);
+   return constructor;
+   } catch (NoSuchMethodException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available.");
+   } catch (SecurityException e) {
+   ExceptionUtils.rethrow(
+   e,
+   "The private constructor 
java.nio.DirectByteBuffer.(long, int) is not available, " +
+   "permission denied by security 
manager");
+   } catch (Throwable t) {
+   ExceptionUtils.rethrow(
+   t,
+   "Unclassified error while trying to access 
private constructor " +
+   "java.nio.DirectByteBuffer.(long, 
int).");
+   }
+   throw new RuntimeException("unexpected to avoid returning 
null");
+   }
+
+   /**
+* Allocates unsafe native memory.
+*
+* @param size size of the unsafe memory to allocate.
+* @return address of the allocated unsafe memory
+*/
+   static long allocateUnsafe(long size) {
+   return UNSAFE.allocateMemory(Math.max(1L, size));
+   }
+
+   /**
+* Creates a cleaner to release the unsafe memory by VM GC.
+*
+* When memory owner becomes phantom reachable,
+* GC will release the underlying unsafe memory if not released yet.
+*
+* @param owner memory owner which phantom reaching is to monitor by GC 
and release the unsafe memory
+* @param address address of the unsafe memory to release
+* @return action to run to release the unsafe memory manually
+*/
+   static Runnable createMemoryGcCleaner(@Nullable Object owner, long 
address) {
+   return createGcCleaner(owner, () -> releaseUnsafe(address));
+   }
+
+   @SuppressWarnings("UseOfSunClasses")
+   private static Runnable createGcCleaner(@Nullable Object owner, 
Runnable toClean) {
+   return owner == null ? toClean : sun.misc.Cleaner.create(owner, 
toClean)::clean;
 
 Review comment:
   If I understand correctly, the reason we use `Cleaner#clean` instead of 
`toClean` directly, is to make sure the cleaner is removed when it's invoked in 
`HybridMemorySegment#free`, so there won't be duplicated `Unsafe#freeMemory` 
calls in later GCs?
   
   I think we should add a test case to make sure nobody breaks it in the 
future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9747: [FLINK-13985] Use unsafe memory for managed memory

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9747: [FLINK-13985] Use 
unsafe memory for managed memory
URL: https://github.com/apache/flink/pull/9747#discussion_r328413824
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
 ##
 @@ -44,15 +43,15 @@
@Test
public void testCompareBytesMixedSegments() {
MemorySegment[] segs1 = {
-   new HeapMemorySegment(new byte[pageSize]),
-   new HybridMemorySegment(new byte[pageSize]),
-   new 
HybridMemorySegment(ByteBuffer.allocateDirect(pageSize))
+   MemorySegmentFactory.allocateUnpooledSegment(pageSize),
 
 Review comment:
   We should not replace `new HeapMemorySegment()` with 
`MemorySegementFactory.allocateUnpooledSegment`. The latter creates a 
`HybridMemorySegment` instead of `HeapMemorySegment`, while this test case is 
meant to also validate the behavior of `HeapMemorySegment`.
   
   Same for other occurrences in this class and `OperationsOnFreedSegmentTest `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9747: [FLINK-13985] Use unsafe memory for managed memory

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9747: [FLINK-13985] Use 
unsafe memory for managed memory
URL: https://github.com/apache/flink/pull/9747#discussion_r328422844
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 ##
 @@ -86,16 +86,16 @@ public static MemorySegment 
allocateUnpooledOffHeapMemory(int size) {
}
 
/**
-* Allocates some unpooled off-heap memory and creates a new memory 
segment that
-* represents that memory.
+* Allocates an unpooled off-heap memory and creates a new memory 
segment to represent that memory.
 *
 * @param size The size of the off-heap memory segment to allocate.
 * @param owner The owner to associate with the off-heap memory segment.
 * @return A new memory segment, backed by unpooled off-heap memory.
 */
public static MemorySegment allocateUnpooledOffHeapMemory(int size, 
Object owner) {
-   ByteBuffer memory = ByteBuffer.allocateDirect(size);
-   return new HybridMemorySegment(memory, owner);
+   long address = MemoryUtils.allocateUnsafe(size);
+   ByteBuffer offHeapBuffer = 
MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
+   return new HybridMemorySegment(offHeapBuffer, owner, 
MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address));
 
 Review comment:
   Both safe and unsafe off-heap memory can be wrapped in 
`HybridMemorySegment`. The only difference is that the unsafe ones have 
non-null cleaners.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services