hive git commit: HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric Wohlstadter, reviewed by Jason Dere)
Repository: hive Updated Branches: refs/heads/branch-3 db8e9b0ef -> 0a1bc3583 HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric Wohlstadter, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a1bc358 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a1bc358 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a1bc358 Branch: refs/heads/branch-3 Commit: 0a1bc358399f9b14999f27bfcb965318fe5ece11 Parents: db8e9b0 Author: Eric Wohlstadter Authored: Thu Nov 29 12:35:01 2018 -0800 Committer: Prasanth Jayachandran Committed: Mon Dec 10 01:28:46 2018 -0800 -- .../hadoop/hive/common/HeapMemoryMonitor.java | 22 +--- .../hive/streaming/AbstractRecordWriter.java| 1 + 2 files changed, 20 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java -- diff --git a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java index 42286be..56ec2fd 100644 --- a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.List; import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.ListenerNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ public class HeapMemoryMonitor { private final double threshold; private List listeners = new ArrayList<>(); + private NotificationListener notificationListener; public interface Listener { void memoryUsageAboveThreshold(long usedMemory, long maxMemory); @@ -140,7 +143,7 @@ public class HeapMemoryMonitor { } MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); NotificationEmitter emitter = (NotificationEmitter) mxBean; -emitter.addNotificationListener((n, hb) -> { +notificationListener = (n, hb) -> { if (n.getType().equals( MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) { long maxMemory = tenuredGenPool.getUsage().getMax(); @@ -149,6 +152,19 @@ public class HeapMemoryMonitor { listener.memoryUsageAboveThreshold(usedMemory, maxMemory); } } -}, null, null); +}; +emitter.addNotificationListener(notificationListener, null, null); } -} \ No newline at end of file + + public void close() { +if(notificationListener != null) { + MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + NotificationEmitter emitter = (NotificationEmitter) mxBean; + try { +emitter.removeNotificationListener(notificationListener); + } catch(ListenerNotFoundException e) { +LOG.warn("Failed to remove HeapMemoryMonitor notification listener from MemoryMXBean", e); + } +} + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java -- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 9e90d36..0408599 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -355,6 +355,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { @Override public void close() throws StreamingIOFailure { +heapMemoryMonitor.close(); boolean haveError = false; String partition = null; if (LOG.isDebugEnabled()) {
hive git commit: HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric Wohlstadter, reviewed by Jason Dere)
Repository: hive Updated Branches: refs/heads/master 3e9814377 -> 75c6ee417 HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric Wohlstadter, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/75c6ee41 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/75c6ee41 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/75c6ee41 Branch: refs/heads/master Commit: 75c6ee417faca1f5d938f3928fde742738e5d62a Parents: 3e98143 Author: Eric Wohlstadter Authored: Thu Nov 29 12:35:01 2018 -0800 Committer: Jason Dere Committed: Thu Nov 29 12:35:01 2018 -0800 -- .../hadoop/hive/common/HeapMemoryMonitor.java | 22 +--- .../hive/streaming/AbstractRecordWriter.java| 1 + 2 files changed, 20 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/75c6ee41/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java -- diff --git a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java index 42286be..56ec2fd 100644 --- a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.List; import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.ListenerNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ public class HeapMemoryMonitor { private final double threshold; private List listeners = new ArrayList<>(); + private NotificationListener notificationListener; public interface Listener { void memoryUsageAboveThreshold(long usedMemory, long maxMemory); @@ -140,7 +143,7 @@ public class HeapMemoryMonitor { } MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); NotificationEmitter emitter = (NotificationEmitter) mxBean; -emitter.addNotificationListener((n, hb) -> { +notificationListener = (n, hb) -> { if (n.getType().equals( MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) { long maxMemory = tenuredGenPool.getUsage().getMax(); @@ -149,6 +152,19 @@ public class HeapMemoryMonitor { listener.memoryUsageAboveThreshold(usedMemory, maxMemory); } } -}, null, null); +}; +emitter.addNotificationListener(notificationListener, null, null); } -} \ No newline at end of file + + public void close() { +if(notificationListener != null) { + MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + NotificationEmitter emitter = (NotificationEmitter) mxBean; + try { +emitter.removeNotificationListener(notificationListener); + } catch(ListenerNotFoundException e) { +LOG.warn("Failed to remove HeapMemoryMonitor notification listener from MemoryMXBean", e); + } +} + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/75c6ee41/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java -- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 88a7d82..e7588e8 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -366,6 +366,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { @Override public void close() throws StreamingIOFailure { +heapMemoryMonitor.close(); boolean haveError = false; String partition = null; if (LOG.isDebugEnabled()) {