Repository: incubator-hawq
Updated Branches:
  refs/heads/master c0cd47c3d -> 31aeb4a11


HAWQ-1498. Segments keep open file descriptors for deleted files


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/31aeb4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/31aeb4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/31aeb4a1

Branch: refs/heads/master
Commit: 31aeb4a11c14571051a39c2144a6eeb8d43a1606
Parents: c0cd47c
Author: Yi <y...@apache.org>
Authored: Fri Aug 11 22:03:33 2017 +1000
Committer: Yi <y...@apache.org>
Committed: Fri Aug 11 22:03:33 2017 +1000

----------------------------------------------------------------------
 src/backend/cdb/cdbpersistentfilesysobj.c |   3 +
 src/backend/storage/file/fd.c             | 171 +++++++++++++++++++------
 src/include/storage/fd.h                  |   1 +
 3 files changed, 139 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/backend/cdb/cdbpersistentfilesysobj.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbpersistentfilesysobj.c 
b/src/backend/cdb/cdbpersistentfilesysobj.c
index 77ae62c..8667fd6 100644
--- a/src/backend/cdb/cdbpersistentfilesysobj.c
+++ b/src/backend/cdb/cdbpersistentfilesysobj.c
@@ -2129,6 +2129,9 @@ void PersistentFileSysObj_EndXactDrop(
                                                        ignoreNonExistence,
                                                        Debug_persistent_print,
                                                        
Persistent_DebugPrintLevel());
+
+       // clean up alive connections that are used for deleting hdfs objects
+       cleanup_hdfs_handlers_for_dropping();
 }
 
 void PersistentFileSysObj_UpdateRelationBufpoolKind(

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/backend/storage/file/fd.c
----------------------------------------------------------------------
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 4ec458e..2366318 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -228,6 +228,7 @@ typedef struct
  * hash table of hdfs file systems, key = hdfs:/<host>:<port>, value = hdfsFS
  */
 static HTAB * HdfsFsTable = NULL;
+static HTAB * HdfsFsTable4Drop = NULL;
 static MemoryContext HdfsGlobalContext = NULL;
 #define EXPECTED_MAX_HDFS_CONNECTIONS 10
 
@@ -298,7 +299,7 @@ static void CleanupTempFiles(bool isProcExit);
 static void RemovePgTempFilesInDir(const char *tmpdirname);
 static bool HasTempFilePrefix(char * fileName);
 
-static hdfsFS HdfsGetConnection(const char * path);
+static hdfsFS HdfsGetConnection(const char * path, bool isForDrop);
 static bool HdfsBasicOpenFile(FileName fileName, int fileFlags, int fileMode,
                                                          char **hProtocol, 
hdfsFS *fs, hdfsFile *hFile);
 static const char * ConvertToUnixPath(const char * fileName, char * buffer,
@@ -1793,7 +1794,7 @@ AllocateDir(const char *dirname)
                        return NULL;
                if (ConvertToUnixPath(dirname, unixpath, sizeof(unixpath)) == 
NULL)
                        return NULL;
-               if ((fs = HdfsGetConnection(dirname)) == NULL)
+               if ((fs = HdfsGetConnection(dirname, false)) == NULL)
                        return NULL;
                /* TODO: add to filesystem! */
                if ((info = hdfsListDirectory(fs, unixpath, &num)) == NULL)
@@ -2005,15 +2006,66 @@ void
 cleanup_filesystem_handler(void)
 {
        HASH_SEQ_STATUS status;
+       HASH_SEQ_STATUS status4drop;
        struct FsEntry *entry;
        char *protocol;
 
-       if (NULL == HdfsFsTable)
+       if (NULL == HdfsFsTable && NULL == HdfsFsTable4Drop)
                return;
 
-       hash_seq_init(&status, HdfsFsTable);
+       if (NULL != HdfsFsTable) {
+               hash_seq_init(&status, HdfsFsTable);
 
-       while (NULL != (entry = hash_seq_search(&status)))
+               while (NULL != (entry = hash_seq_search(&status)))
+               {
+                       if (HdfsParsePath(entry->host, &protocol, NULL, NULL, 
NULL) || NULL == protocol)
+                       {
+                               elog(WARNING, "cannot get protocol for host: 
%s", entry->host);
+                               continue;
+                       }
+
+                       if (entry->fs)
+                               HdfsDisconnect(protocol, entry->fs);
+                       pfree(protocol);
+               }
+               hash_destroy(HdfsFsTable);
+               HdfsFsTable = NULL;
+       }
+
+       if (NULL != HdfsFsTable4Drop) {
+               hash_seq_init(&status4drop, HdfsFsTable4Drop);
+
+               while (NULL != (entry = hash_seq_search(&status4drop)))
+               {
+                       if (HdfsParsePath(entry->host, &protocol, NULL, NULL, 
NULL) || NULL == protocol)
+                       {
+                               elog(WARNING, "cannot get protocol for host: 
%s", entry->host);
+                               continue;
+                       }
+
+                       if (entry->fs)
+                               HdfsDisconnect(protocol, entry->fs);
+                       pfree(protocol);
+               }
+               hash_destroy(HdfsFsTable4Drop);
+               HdfsFsTable4Drop = NULL;
+       }
+       MemoryContextResetAndDeleteChildren(HdfsGlobalContext);
+}
+
+void
+cleanup_hdfs_handlers_for_dropping()
+{
+       HASH_SEQ_STATUS status4drop;
+       struct FsEntry *entry;
+       char *protocol;
+
+       if (NULL == HdfsFsTable4Drop)
+               return;
+
+       hash_seq_init(&status4drop, HdfsFsTable4Drop);
+
+       while (NULL != (entry = hash_seq_search(&status4drop)))
        {
                if (HdfsParsePath(entry->host, &protocol, NULL, NULL, NULL) || 
NULL == protocol)
                {
@@ -2026,13 +2078,10 @@ cleanup_filesystem_handler(void)
                pfree(protocol);
        }
 
-       hash_destroy(HdfsFsTable);
-       HdfsFsTable = NULL;
-
-       MemoryContextResetAndDeleteChildren(HdfsGlobalContext);
+       hash_destroy(HdfsFsTable4Drop);
+       HdfsFsTable4Drop = NULL;
 }
 
-
 /*
  * closeAllVfds
  *
@@ -2339,10 +2388,11 @@ HasTempFilePrefix(char * fileName)
  *             hdfs:/<host>:<port>/...
  */
 static hdfsFS
-HdfsGetConnection(const char * path)
+HdfsGetConnection(const char * path, bool isForDrop)
 {
        struct FsEntry * entry;
        HASHCTL hash_ctl;
+       HASHCTL hash_ctl_4drop;
        bool found;
 
        char *host = NULL, *location = NULL, *protocol;
@@ -2363,15 +2413,16 @@ HdfsGetConnection(const char * path)
                else
                        sprintf(location, "%s://%s/", protocol, host);
 
+               if (NULL == HdfsGlobalContext)
+               {
+                       Assert(NULL != TopMemoryContext);
+                       HdfsGlobalContext = 
AllocSetContextCreate(TopMemoryContext,
+                                       "HDFS Global Context", 
ALLOCSET_DEFAULT_MINSIZE,
+                                       ALLOCSET_DEFAULT_INITSIZE, 
ALLOCSET_DEFAULT_MAXSIZE);
+               }
+
                if (NULL == HdfsFsTable)
                {
-                       if (NULL == HdfsGlobalContext)
-                       {
-                               Assert(NULL != TopMemoryContext);
-                               HdfsGlobalContext = 
AllocSetContextCreate(TopMemoryContext,
-                                               "HDFS Global Context", 
ALLOCSET_DEFAULT_MINSIZE,
-                                               ALLOCSET_DEFAULT_INITSIZE, 
ALLOCSET_DEFAULT_MAXSIZE);
-                       }
 
                        MemSet(&hash_ctl, 0, sizeof(hash_ctl));
                        hash_ctl.keysize = MAXPGPATH;
@@ -2380,8 +2431,9 @@ HdfsGetConnection(const char * path)
                        hash_ctl.hcxt = HdfsGlobalContext;
 
                        HdfsFsTable = hash_create("hdfs connections hash table",
-                                       EXPECTED_MAX_HDFS_CONNECTIONS, 
&hash_ctl,
-                                       HASH_ELEM | HASH_FUNCTION | 
HASH_CONTEXT);
+                                                                         
EXPECTED_MAX_HDFS_CONNECTIONS,
+                                                                         
&hash_ctl,
+                                                                         
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
 
                        if (HdfsFsTable == NULL )
                        {
@@ -2389,6 +2441,30 @@ HdfsGetConnection(const char * path)
                                errno = EIO;
                                break;
                        }
+
+                       elog(LOG, "created hash table for hdfs access.");
+               }
+
+               if (NULL == HdfsFsTable4Drop)
+               {
+                       MemSet(&hash_ctl_4drop, 0, sizeof(hash_ctl_4drop));
+                       hash_ctl_4drop.keysize = MAXPGPATH;
+                       hash_ctl_4drop.entrysize = sizeof(struct FsEntry);
+                       hash_ctl_4drop.hash = string_hash;
+                       hash_ctl_4drop.hcxt = HdfsGlobalContext;
+
+                       HdfsFsTable4Drop = hash_create("hash connections hash 
table for drop",
+                                                                               
   EXPECTED_MAX_HDFS_CONNECTIONS,
+                                                                               
   &hash_ctl_4drop,
+                                                                               
   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+                       if (HdfsFsTable4Drop == NULL )
+                       {
+                               elog(WARNING, "failed to create hash table for 
dropping table.");
+                               errno = EIO;
+                               break;
+                       }
+
+                       elog(LOG, "created hash table (4drop) for hdfs 
access.");
                }
 
                if (enable_secure_filesystem && Gp_role != GP_ROLE_EXECUTE)
@@ -2403,8 +2479,21 @@ HdfsGetConnection(const char * path)
                    }
                }
 
-               entry = (struct FsEntry *) hash_search(HdfsFsTable, location,
-                               HASH_ENTER, &found);
+               /* If this is for normal connection, check from normal table, 
otherwise,
+                * check the table for dropping. */
+               if (!isForDrop) {
+                       entry = (struct FsEntry *) hash_search(HdfsFsTable,
+                                                                               
                   location,
+                                                                               
                   HASH_ENTER,
+                                                                               
                   &found);
+               }
+               else
+               {
+                       entry = (struct FsEntry *) hash_search(HdfsFsTable4Drop,
+                                                                               
                   location,
+                                                                               
                   HASH_ENTER,
+                                                                               
                   &found);
+               }
 
                if (!found)
                {
@@ -2423,7 +2512,10 @@ HdfsGetConnection(const char * path)
                                                                                
errmsg("failed to get filesystem credential."),
                                                                                
errdetail("%s", HdfsGetLastError())));
 
-                                               hash_search(HdfsFsTable, 
location, HASH_REMOVE, &found);
+                                               if (!isForDrop)
+                                                   hash_search(HdfsFsTable, 
location, HASH_REMOVE, &found);
+                                               else
+                                                       
hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found);
                                                errno = EACCES;
                                                break;
                                        }
@@ -2432,7 +2524,11 @@ HdfsGetConnection(const char * path)
                                {
                                        if (!login())
                                        {
-                                               hash_search(HdfsFsTable, 
location, HASH_REMOVE, &found);
+                                               if (!isForDrop)
+                                                       
hash_search(HdfsFsTable, location, HASH_REMOVE, &found);
+                                               else {
+                                                       
hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found);
+                                               }
                                                errno = EACCES;
                                                break;
                                        }
@@ -2448,7 +2544,10 @@ HdfsGetConnection(const char * path)
 
                        if (NULL == entry->fs)
                        {
-                               hash_search(HdfsFsTable, location, HASH_REMOVE, 
&found);
+                               if (!isForDrop)
+                                       hash_search(HdfsFsTable, location, 
HASH_REMOVE, &found);
+                               else
+                                       hash_search(HdfsFsTable4Drop, location, 
HASH_REMOVE, &found);
                                ereport(LOG,
                                                (errcode(ERRCODE_IO_ERROR),
                                                                errmsg("fail to 
connect hdfs at %s, errno = %d", location, errno),
@@ -2606,7 +2705,7 @@ HdfsBasicOpenFile(FileName fileName, int fileFlags, int 
fileMode,
        if (NULL == ConvertToUnixPath(fileName, path, sizeof(path)))
                return FALSE;
 
-       tempfs = HdfsGetConnection(fileName);
+       tempfs = HdfsGetConnection(fileName, false);
        if (tempfs == NULL)
                return FALSE;
 
@@ -2976,7 +3075,7 @@ HdfsRemovePath(FileName fileName, int recursive)
        if (HdfsParsePath(fileName, &protocol, NULL, NULL, NULL) || NULL == 
protocol)
                return -1;
 
-       hdfsFS fs = HdfsGetConnection(fileName);
+       hdfsFS fs = HdfsGetConnection(fileName, true);
        if (NULL == fs)
                return -1;
        if (NULL == ConvertToUnixPath(fileName, path, sizeof(path)))
@@ -3001,7 +3100,7 @@ HdfsMakeDirectory(const char * path, mode_t mode)
        if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || protocol == 
NULL)
                return -1;
 
-       hdfsFS fs = HdfsGetConnection(path);
+       hdfsFS fs = HdfsGetConnection(path, false);
        if (NULL == fs)
                return -1;
        if (NULL == ConvertToUnixPath(path, p, sizeof(p)))
@@ -3128,7 +3227,7 @@ HdfsGetDelegationToken(const char *uri, void **fs)
        char *token;
        char *retval;
 
-       *fs = HdfsGetConnection(uri);
+       *fs = HdfsGetConnection(uri, false);
        if (*fs == NULL)
                return NULL;
 
@@ -3285,7 +3384,7 @@ HdfsPathFileTruncate(FileName fileName) {
                return -1;
        }
 
-       fs = HdfsGetConnection(fileName);
+       fs = HdfsGetConnection(fileName, true);
        if (fs == NULL)
                return -1;
 
@@ -3338,7 +3437,7 @@ HdfsPathExist(char *path)
        if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == 
protocol)
                elog(ERROR, "cannot get protocol for path: %s", path);
 
-       fs = HdfsGetConnection(path);
+       fs = HdfsGetConnection(path, false);
 
        if (fs == NULL)
                return false;
@@ -3363,7 +3462,7 @@ HdfsPathExistAndNonEmpty(char *path, bool *existed)
   if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol)
     elog(ERROR, "cannot get protocol for path: %s", path);
 
-  fs = HdfsGetConnection(path);
+  fs = HdfsGetConnection(path, false);
 
   if (fs == NULL)
     return false;
@@ -3446,7 +3545,7 @@ HdfsPathSize(const char *path)
     return 0;
   if (ConvertToUnixPath(path, unixpath, sizeof(unixpath)) == NULL)
     return 0;
-  if ((fs = HdfsGetConnection(path)) == NULL)
+  if ((fs = HdfsGetConnection(path, false)) == NULL)
     return 0;
 
   total_size = HdfsPathSizeRecursive(fs, protocol, unixpath);
@@ -3469,7 +3568,7 @@ HdfsGetFileLength(char * path)
        if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == 
protocol)
                elog(ERROR, "cannot get protocol for path: %s", path);
 
-       fs = HdfsGetConnection(path);
+       fs = HdfsGetConnection(path, false);
 
        if (fs == NULL)
                return -1;
@@ -3506,7 +3605,7 @@ int HdfsIsDirOrFile(char * path)
        if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == 
protocol)
                elog(ERROR, "cannot get protocol for path: %s", path);
 
-       fs = HdfsGetConnection(path);
+       fs = HdfsGetConnection(path, false);
 
        if (fs == NULL)
                return -1;
@@ -3552,7 +3651,7 @@ HdfsGetFileBlockLocations2(const char *path, int64 
offset, int64 length, int *bl
                elog(ERROR, "cannot get protocol for path: %s", path);
        }
 
-       fs = HdfsGetConnection(path);
+       fs = HdfsGetConnection(path, false);
 
        if (NULL == fs)
        {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/include/storage/fd.h
----------------------------------------------------------------------
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 0c25264..dde0c69 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -113,6 +113,7 @@ extern char *DeserializeDelegationToken(void *binary, int 
size);
 
 extern void cleanup_lru_opened_files(void);
 extern void cleanup_filesystem_handler(void);
+extern void cleanup_hdfs_handlers_for_dropping(void);
 
 /* abstract file system */
 extern File FileNameOpenFile(FileName fileName, const char *temp_dir, int 
fileFlags, int fileMode);

Reply via email to