hive git commit: HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh)

2018-12-10 Thread prasanthj
Repository: hive
Updated Branches:
  refs/heads/branch-3 0a1bc3583 -> a7b3cf4bd


HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by 
Prasanth, Eric, Ashutosh)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7b3cf4b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7b3cf4b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7b3cf4b

Branch: refs/heads/branch-3
Commit: a7b3cf4bd2239876b323ab544e12d547d875b6c4
Parents: 0a1bc35
Author: Shubham Chaurasia 
Authored: Mon Dec 10 01:33:00 2018 -0800
Committer: Prasanth Jayachandran 
Committed: Mon Dec 10 01:39:44 2018 -0800

--
 .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 +
 .../org/apache/hive/streaming/HiveStreamingConnection.java   | 8 +++-
 2 files changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
--
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 0408599..0653a5d 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -383,6 +383,11 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 if (LOG.isDebugEnabled()) {
   logStats("Stats after close:");
 }
+try {
+  this.fs.close();
+} catch (IOException e) {
+  throw new StreamingIOFailure("Error while closing FileSystem", e);
+}
 if (haveError) {
   throw new StreamingIOFailure("Encountered errors while closing (see 
logs) " + getWatermark(partition));
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
--
diff --git 
a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 6cf14b0..8ca8fe2 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -158,6 +158,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   private Table tableObject = null;
   private String metastoreUri;
   private ConnectionStats connectionStats;
+  private Runnable onShutdownRunner;
 
   private HiveStreamingConnection(Builder builder) throws StreamingException {
 this.database = builder.database.toLowerCase();
@@ -330,9 +331,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
 throw new StreamingException("Record writer cannot be null for 
streaming connection");
   }
   HiveStreamingConnection streamingConnection = new 
HiveStreamingConnection(this);
+  streamingConnection.onShutdownRunner = streamingConnection::close;
   // assigning higher priority than FileSystem shutdown hook so that 
streaming connection gets closed first before
   // filesystem close (to avoid ClosedChannelException)
-  ShutdownHookManager.addShutdownHook(streamingConnection::close,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+  
ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
   Thread.setDefaultUncaughtExceptionHandler((t, e) -> 
streamingConnection.close());
   return streamingConnection;
 }
@@ -551,6 +553,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
 } finally {
   getMSC().close();
   getHeatbeatMSC().close();
+  //remove shutdown hook entry added while creating this connection via 
HiveStreamingConnection.Builder#connect()
+  if (!ShutdownHookManager.isShutdownInProgress()) {
+ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
+  }
 }
 if (LOG.isInfoEnabled()) {
   LOG.info("Closed streaming connection. Agent: {} Stats: {}", 
getAgentInfo(), getConnectionStats());



hive git commit: HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh)

2018-12-10 Thread prasanthj
Repository: hive
Updated Branches:
  refs/heads/master 706bf724e -> f5618d922


HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by 
Prasanth, Eric, Ashutosh)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5618d92
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5618d92
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5618d92

Branch: refs/heads/master
Commit: f5618d9227e5f6e643aaf9d8d625dc1fc42180dc
Parents: 706bf72
Author: Prasanth Jayachandran 
Authored: Mon Dec 10 01:33:00 2018 -0800
Committer: Prasanth Jayachandran 
Committed: Mon Dec 10 01:33:00 2018 -0800

--
 .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 +
 .../org/apache/hive/streaming/HiveStreamingConnection.java   | 8 +++-
 2 files changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
--
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index e7588e8..14d34d4 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -394,6 +394,11 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 if (LOG.isDebugEnabled()) {
   logStats("Stats after close:");
 }
+try {
+  this.fs.close();
+} catch (IOException e) {
+  throw new StreamingIOFailure("Error while closing FileSystem", e);
+}
 if (haveError) {
   throw new StreamingIOFailure("Encountered errors while closing (see 
logs) " + getWatermark(partition));
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
--
diff --git 
a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 74fc531..a32aa62 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -147,6 +147,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   private int countTransactions = 0;
   private Set partitions;
   private Long tableId;
+  private Runnable onShutdownRunner;
 
   private HiveStreamingConnection(Builder builder) throws StreamingException {
 this.database = builder.database.toLowerCase();
@@ -389,9 +390,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   }
 
   HiveStreamingConnection streamingConnection = new 
HiveStreamingConnection(this);
+  streamingConnection.onShutdownRunner = streamingConnection::close;
   // assigning higher priority than FileSystem shutdown hook so that 
streaming connection gets closed first before
   // filesystem close (to avoid ClosedChannelException)
-  ShutdownHookManager.addShutdownHook(streamingConnection::close,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+  
ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
   Thread.setDefaultUncaughtExceptionHandler((t, e) -> 
streamingConnection.close());
   return streamingConnection;
 }
@@ -651,6 +653,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
 getMSC().close();
 getHeatbeatMSC().close();
   }
+  //remove shutdown hook entry added while creating this connection via 
HiveStreamingConnection.Builder#connect()
+  if (!ShutdownHookManager.isShutdownInProgress()) {
+ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
+  }
 }
 if (LOG.isInfoEnabled()) {
   LOG.info("Closed streaming connection. Agent: {} Stats: {}", 
getAgentInfo(), getConnectionStats());