hive git commit: HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric Wohlstadter, reviewed by Jason Dere)

2018-12-10 Thread prasanthj
Repository: hive
Updated Branches:
  refs/heads/branch-3 db8e9b0ef -> 0a1bc3583


HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric 
Wohlstadter, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a1bc358
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a1bc358
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a1bc358

Branch: refs/heads/branch-3
Commit: 0a1bc358399f9b14999f27bfcb965318fe5ece11
Parents: db8e9b0
Author: Eric Wohlstadter 
Authored: Thu Nov 29 12:35:01 2018 -0800
Committer: Prasanth Jayachandran 
Committed: Mon Dec 10 01:28:46 2018 -0800

--
 .../hadoop/hive/common/HeapMemoryMonitor.java   | 22 +---
 .../hive/streaming/AbstractRecordWriter.java|  1 +
 2 files changed, 20 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
--
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java 
b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
index 42286be..56ec2fd 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.ListenerNotFoundException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ public class HeapMemoryMonitor {
 
   private final double threshold;
   private List listeners = new ArrayList<>();
+  private NotificationListener notificationListener;
 
   public interface Listener {
 void memoryUsageAboveThreshold(long usedMemory, long maxMemory);
@@ -140,7 +143,7 @@ public class HeapMemoryMonitor {
 }
 MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
 NotificationEmitter emitter = (NotificationEmitter) mxBean;
-emitter.addNotificationListener((n, hb) -> {
+notificationListener = (n, hb) -> {
   if (n.getType().equals(
 MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
 long maxMemory = tenuredGenPool.getUsage().getMax();
@@ -149,6 +152,19 @@ public class HeapMemoryMonitor {
   listener.memoryUsageAboveThreshold(usedMemory, maxMemory);
 }
   }
-}, null, null);
+};
+emitter.addNotificationListener(notificationListener, null, null);
   }
-}
\ No newline at end of file
+
+  public void close() {
+if(notificationListener != null) {
+  MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+  NotificationEmitter emitter = (NotificationEmitter) mxBean;
+  try {
+emitter.removeNotificationListener(notificationListener);
+  } catch(ListenerNotFoundException e) {
+LOG.warn("Failed to remove HeapMemoryMonitor notification listener 
from MemoryMXBean", e);
+  }
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
--
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 9e90d36..0408599 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -355,6 +355,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 
   @Override
   public void close() throws StreamingIOFailure {
+heapMemoryMonitor.close();
 boolean haveError = false;
 String partition = null;
 if (LOG.isDebugEnabled()) {



hive git commit: HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric Wohlstadter, reviewed by Jason Dere)

2018-11-29 Thread jdere
Repository: hive
Updated Branches:
  refs/heads/master 3e9814377 -> 75c6ee417


HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric 
Wohlstadter, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/75c6ee41
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/75c6ee41
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/75c6ee41

Branch: refs/heads/master
Commit: 75c6ee417faca1f5d938f3928fde742738e5d62a
Parents: 3e98143
Author: Eric Wohlstadter 
Authored: Thu Nov 29 12:35:01 2018 -0800
Committer: Jason Dere 
Committed: Thu Nov 29 12:35:01 2018 -0800

--
 .../hadoop/hive/common/HeapMemoryMonitor.java   | 22 +---
 .../hive/streaming/AbstractRecordWriter.java|  1 +
 2 files changed, 20 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hive/blob/75c6ee41/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
--
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java 
b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
index 42286be..56ec2fd 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.ListenerNotFoundException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ public class HeapMemoryMonitor {
 
   private final double threshold;
   private List listeners = new ArrayList<>();
+  private NotificationListener notificationListener;
 
   public interface Listener {
 void memoryUsageAboveThreshold(long usedMemory, long maxMemory);
@@ -140,7 +143,7 @@ public class HeapMemoryMonitor {
 }
 MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
 NotificationEmitter emitter = (NotificationEmitter) mxBean;
-emitter.addNotificationListener((n, hb) -> {
+notificationListener = (n, hb) -> {
   if (n.getType().equals(
 MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
 long maxMemory = tenuredGenPool.getUsage().getMax();
@@ -149,6 +152,19 @@ public class HeapMemoryMonitor {
   listener.memoryUsageAboveThreshold(usedMemory, maxMemory);
 }
   }
-}, null, null);
+};
+emitter.addNotificationListener(notificationListener, null, null);
   }
-}
\ No newline at end of file
+
+  public void close() {
+if(notificationListener != null) {
+  MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+  NotificationEmitter emitter = (NotificationEmitter) mxBean;
+  try {
+emitter.removeNotificationListener(notificationListener);
+  } catch(ListenerNotFoundException e) {
+LOG.warn("Failed to remove HeapMemoryMonitor notification listener 
from MemoryMXBean", e);
+  }
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/75c6ee41/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
--
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 88a7d82..e7588e8 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -366,6 +366,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 
   @Override
   public void close() throws StreamingIOFailure {
+heapMemoryMonitor.close();
 boolean haveError = false;
 String partition = null;
 if (LOG.isDebugEnabled()) {