HBASE-15291 FileSystem not closed in secure bulkLoad

Signed-off-by: Ashish Singhi <ashishsin...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/828a1c76
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/828a1c76
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/828a1c76

Branch: refs/heads/HBASE-19064
Commit: 828a1c76c71b0179bd9709e3da5d988b18fea631
Parents: 95ca38a
Author: Ashish Singhi <ashishsin...@apache.org>
Authored: Wed Apr 11 12:01:28 2018 +0530
Committer: Ashish Singhi <ashishsin...@apache.org>
Committed: Wed Apr 11 12:01:28 2018 +0530

----------------------------------------------------------------------
 .../regionserver/SecureBulkLoadManager.java     | 82 +++++++++++++-------
 1 file changed, 54 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/828a1c76/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 264d985..a4ee517 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -145,15 +145,26 @@ public class SecureBulkLoadManager {
 
   public void cleanupBulkLoad(final HRegion region, final 
CleanupBulkLoadRequest request)
       throws IOException {
-    region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
+    try {
+      region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
 
-    Path path = new Path(request.getBulkToken());
-    if (!fs.delete(path, true)) {
-      if (fs.exists(path)) {
-        throw new IOException("Failed to clean up " + path);
+      Path path = new Path(request.getBulkToken());
+      if (!fs.delete(path, true)) {
+        if (fs.exists(path)) {
+          throw new IOException("Failed to clean up " + path);
+        }
+      }
+      LOG.info("Cleaned up " + path + " successfully.");
+    } finally {
+      UserGroupInformation ugi = getActiveUser().getUGI();
+      try {
+        if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+          FileSystem.closeAllForUGI(ugi);
+        }
+      } catch (IOException e) {
+        LOG.error("Failed to close FileSystem for: " + ugi, e);
       }
     }
-    LOG.info("Cleaned up " + path + " successfully.");
   }
 
   public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
@@ -304,7 +315,7 @@ public class SecureBulkLoadManager {
       }
 
       if (srcFs == null) {
-        srcFs = FileSystem.get(p.toUri(), conf);
+        srcFs = FileSystem.newInstance(p.toUri(), conf);
       }
 
       if(!isFile(p)) {
@@ -334,34 +345,49 @@ public class SecureBulkLoadManager {
     @Override
     public void doneBulkLoad(byte[] family, String srcPath) throws IOException 
{
       LOG.debug("Bulk Load done for: " + srcPath);
+      closeSrcFs();
+    }
+
+    private void closeSrcFs() throws IOException {
+      if (srcFs != null) {
+        srcFs.close();
+        srcFs = null;
+      }
     }
 
     @Override
     public void failedBulkLoad(final byte[] family, final String srcPath) 
throws IOException {
-      if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
-        // files are copied so no need to move them back
-        return;
-      }
-      Path p = new Path(srcPath);
-      Path stageP = new Path(stagingDir,
-          new Path(Bytes.toString(family), p.getName()));
+      try {
+        Path p = new Path(srcPath);
+        if (srcFs == null) {
+          srcFs = FileSystem.newInstance(p.toUri(), conf);
+        }
+        if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+          // files are copied so no need to move them back
+          return;
+        }
+        Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), 
p.getName()));
 
-      // In case of Replication for bulk load files, hfiles are not renamed by 
end point during
-      // prepare stage, so no need of rename here again
-      if (p.equals(stageP)) {
-        LOG.debug(p.getName() + " is already available in source directory. 
Skipping rename.");
-        return;
-      }
+        // In case of Replication for bulk load files, hfiles are not renamed 
by end point during
+        // prepare stage, so no need of rename here again
+        if (p.equals(stageP)) {
+          LOG.debug(p.getName() + " is already available in source directory. 
Skipping rename.");
+          return;
+        }
 
-      LOG.debug("Moving " + stageP + " back to " + p);
-      if(!fs.rename(stageP, p))
-        throw new IOException("Failed to move HFile: " + stageP + " to " + p);
+        LOG.debug("Moving " + stageP + " back to " + p);
+        if (!fs.rename(stageP, p)) {
+          throw new IOException("Failed to move HFile: " + stageP + " to " + 
p);
+        }
 
-      // restore original permission
-      if (origPermissions.containsKey(srcPath)) {
-        fs.setPermission(p, origPermissions.get(srcPath));
-      } else {
-        LOG.warn("Can't find previous permission for path=" + srcPath);
+        // restore original permission
+        if (origPermissions.containsKey(srcPath)) {
+          fs.setPermission(p, origPermissions.get(srcPath));
+        } else {
+          LOG.warn("Can't find previous permission for path=" + srcPath);
+        }
+      } finally {
+        closeSrcFs();
       }
     }
 

Reply via email to