hive git commit: HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh)
Repository: hive Updated Branches: refs/heads/branch-3 0a1bc3583 -> a7b3cf4bd HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7b3cf4b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7b3cf4b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7b3cf4b Branch: refs/heads/branch-3 Commit: a7b3cf4bd2239876b323ab544e12d547d875b6c4 Parents: 0a1bc35 Author: Shubham Chaurasia Authored: Mon Dec 10 01:33:00 2018 -0800 Committer: Prasanth Jayachandran Committed: Mon Dec 10 01:39:44 2018 -0800 -- .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 + .../org/apache/hive/streaming/HiveStreamingConnection.java | 8 +++- 2 files changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java -- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 0408599..0653a5d 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -383,6 +383,11 @@ public abstract class AbstractRecordWriter implements RecordWriter { if (LOG.isDebugEnabled()) { logStats("Stats after close:"); } +try { + this.fs.close(); +} catch (IOException e) { + throw new StreamingIOFailure("Error while closing FileSystem", e); +} if (haveError) { throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition)); } http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java -- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 6cf14b0..8ca8fe2 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -158,6 +158,7 @@ public class HiveStreamingConnection implements StreamingConnection { private Table tableObject = null; private String metastoreUri; private ConnectionStats connectionStats; + private Runnable onShutdownRunner; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -330,9 +331,10 @@ public class HiveStreamingConnection implements StreamingConnection { throw new StreamingException("Record writer cannot be null for streaming connection"); } HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); + streamingConnection.onShutdownRunner = streamingConnection::close; // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) - ShutdownHookManager.addShutdownHook(streamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); + ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close()); return streamingConnection; } @@ -551,6 +553,10 @@ public class HiveStreamingConnection implements StreamingConnection { } finally { getMSC().close(); getHeatbeatMSC().close(); + //remove shutdown hook entry added while creating this connection via HiveStreamingConnection.Builder#connect() + if (!ShutdownHookManager.isShutdownInProgress()) { +ShutdownHookManager.removeShutdownHook(this.onShutdownRunner); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
hive git commit: HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh)
Repository: hive Updated Branches: refs/heads/master 706bf724e -> f5618d922 HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5618d92 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5618d92 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5618d92 Branch: refs/heads/master Commit: f5618d9227e5f6e643aaf9d8d625dc1fc42180dc Parents: 706bf72 Author: Prasanth Jayachandran Authored: Mon Dec 10 01:33:00 2018 -0800 Committer: Prasanth Jayachandran Committed: Mon Dec 10 01:33:00 2018 -0800 -- .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 + .../org/apache/hive/streaming/HiveStreamingConnection.java | 8 +++- 2 files changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java -- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index e7588e8..14d34d4 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -394,6 +394,11 @@ public abstract class AbstractRecordWriter implements RecordWriter { if (LOG.isDebugEnabled()) { logStats("Stats after close:"); } +try { + this.fs.close(); +} catch (IOException e) { + throw new StreamingIOFailure("Error while closing FileSystem", e); +} if (haveError) { throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition)); } http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java -- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 74fc531..a32aa62 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -147,6 +147,7 @@ public class HiveStreamingConnection implements StreamingConnection { private int countTransactions = 0; private Set partitions; private Long tableId; + private Runnable onShutdownRunner; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -389,9 +390,10 @@ public class HiveStreamingConnection implements StreamingConnection { } HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); + streamingConnection.onShutdownRunner = streamingConnection::close; // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) - ShutdownHookManager.addShutdownHook(streamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); + ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close()); return streamingConnection; } @@ -651,6 +653,10 @@ public class HiveStreamingConnection implements StreamingConnection { getMSC().close(); getHeatbeatMSC().close(); } + //remove shutdown hook entry added while creating this connection via HiveStreamingConnection.Builder#connect() + if (!ShutdownHookManager.isShutdownInProgress()) { +ShutdownHookManager.removeShutdownHook(this.onShutdownRunner); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());