Repository: incubator-hawq Updated Branches: refs/heads/master c0cd47c3d -> 31aeb4a11
HAWQ-1498. Segments keep open file descriptors for deleted files Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/31aeb4a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/31aeb4a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/31aeb4a1 Branch: refs/heads/master Commit: 31aeb4a11c14571051a39c2144a6eeb8d43a1606 Parents: c0cd47c Author: Yi <y...@apache.org> Authored: Fri Aug 11 22:03:33 2017 +1000 Committer: Yi <y...@apache.org> Committed: Fri Aug 11 22:03:33 2017 +1000 ---------------------------------------------------------------------- src/backend/cdb/cdbpersistentfilesysobj.c | 3 + src/backend/storage/file/fd.c | 171 +++++++++++++++++++------ src/include/storage/fd.h | 1 + 3 files changed, 139 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/backend/cdb/cdbpersistentfilesysobj.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/cdbpersistentfilesysobj.c b/src/backend/cdb/cdbpersistentfilesysobj.c index 77ae62c..8667fd6 100644 --- a/src/backend/cdb/cdbpersistentfilesysobj.c +++ b/src/backend/cdb/cdbpersistentfilesysobj.c @@ -2129,6 +2129,9 @@ void PersistentFileSysObj_EndXactDrop( ignoreNonExistence, Debug_persistent_print, Persistent_DebugPrintLevel()); + + // clean up alive connections that are used for deleting hdfs objects + cleanup_hdfs_handlers_for_dropping(); } void PersistentFileSysObj_UpdateRelationBufpoolKind( http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/backend/storage/file/fd.c ---------------------------------------------------------------------- diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 4ec458e..2366318 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -228,6 +228,7 @@ typedef struct * hash table of hdfs file systems, key = hdfs:/<host>:<port>, value = hdfsFS */ static HTAB * HdfsFsTable = NULL; +static HTAB * HdfsFsTable4Drop = NULL; static MemoryContext HdfsGlobalContext = NULL; #define EXPECTED_MAX_HDFS_CONNECTIONS 10 @@ -298,7 +299,7 @@ static void CleanupTempFiles(bool isProcExit); static void RemovePgTempFilesInDir(const char *tmpdirname); static bool HasTempFilePrefix(char * fileName); -static hdfsFS HdfsGetConnection(const char * path); +static hdfsFS HdfsGetConnection(const char * path, bool isForDrop); static bool HdfsBasicOpenFile(FileName fileName, int fileFlags, int fileMode, char **hProtocol, hdfsFS *fs, hdfsFile *hFile); static const char * ConvertToUnixPath(const char * fileName, char * buffer, @@ -1793,7 +1794,7 @@ AllocateDir(const char *dirname) return NULL; if (ConvertToUnixPath(dirname, unixpath, sizeof(unixpath)) == NULL) return NULL; - if ((fs = HdfsGetConnection(dirname)) == NULL) + if ((fs = HdfsGetConnection(dirname, false)) == NULL) return NULL; /* TODO: add to filesystem! */ if ((info = hdfsListDirectory(fs, unixpath, &num)) == NULL) @@ -2005,15 +2006,66 @@ void cleanup_filesystem_handler(void) { HASH_SEQ_STATUS status; + HASH_SEQ_STATUS status4drop; struct FsEntry *entry; char *protocol; - if (NULL == HdfsFsTable) + if (NULL == HdfsFsTable && NULL == HdfsFsTable4Drop) return; - hash_seq_init(&status, HdfsFsTable); + if (NULL != HdfsFsTable) { + hash_seq_init(&status, HdfsFsTable); - while (NULL != (entry = hash_seq_search(&status))) + while (NULL != (entry = hash_seq_search(&status))) + { + if (HdfsParsePath(entry->host, &protocol, NULL, NULL, NULL) || NULL == protocol) + { + elog(WARNING, "cannot get protocol for host: %s", entry->host); + continue; + } + + if (entry->fs) + HdfsDisconnect(protocol, entry->fs); + pfree(protocol); + } + hash_destroy(HdfsFsTable); + HdfsFsTable = NULL; + } + + if (NULL != HdfsFsTable4Drop) { + hash_seq_init(&status4drop, HdfsFsTable4Drop); + + while (NULL != (entry = hash_seq_search(&status4drop))) + { + if (HdfsParsePath(entry->host, &protocol, NULL, NULL, NULL) || NULL == protocol) + { + elog(WARNING, "cannot get protocol for host: %s", entry->host); + continue; + } + + if (entry->fs) + HdfsDisconnect(protocol, entry->fs); + pfree(protocol); + } + hash_destroy(HdfsFsTable4Drop); + HdfsFsTable4Drop = NULL; + } + MemoryContextResetAndDeleteChildren(HdfsGlobalContext); +} + +void +cleanup_hdfs_handlers_for_dropping() +{ + HASH_SEQ_STATUS status4drop; + struct FsEntry *entry; + char *protocol; + + if (NULL == HdfsFsTable4Drop) + return; + + hash_seq_init(&status4drop, HdfsFsTable4Drop); + + while (NULL != (entry = hash_seq_search(&status4drop))) { if (HdfsParsePath(entry->host, &protocol, NULL, NULL, NULL) || NULL == protocol) { @@ -2026,13 +2078,10 @@ cleanup_filesystem_handler(void) pfree(protocol); } - hash_destroy(HdfsFsTable); - HdfsFsTable = NULL; - - MemoryContextResetAndDeleteChildren(HdfsGlobalContext); + hash_destroy(HdfsFsTable4Drop); + HdfsFsTable4Drop = NULL; } - /* * closeAllVfds * @@ -2339,10 +2388,11 @@ HasTempFilePrefix(char * fileName) * hdfs:/<host>:<port>/... */ static hdfsFS -HdfsGetConnection(const char * path) +HdfsGetConnection(const char * path, bool isForDrop) { struct FsEntry * entry; HASHCTL hash_ctl; + HASHCTL hash_ctl_4drop; bool found; char *host = NULL, *location = NULL, *protocol; @@ -2363,15 +2413,16 @@ HdfsGetConnection(const char * path) else sprintf(location, "%s://%s/", protocol, host); + if (NULL == HdfsGlobalContext) + { + Assert(NULL != TopMemoryContext); + HdfsGlobalContext = AllocSetContextCreate(TopMemoryContext, + "HDFS Global Context", ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + } + if (NULL == HdfsFsTable) { - if (NULL == HdfsGlobalContext) - { - Assert(NULL != TopMemoryContext); - HdfsGlobalContext = AllocSetContextCreate(TopMemoryContext, - "HDFS Global Context", ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - } MemSet(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = MAXPGPATH; @@ -2380,8 +2431,9 @@ HdfsGetConnection(const char * path) hash_ctl.hcxt = HdfsGlobalContext; HdfsFsTable = hash_create("hdfs connections hash table", - EXPECTED_MAX_HDFS_CONNECTIONS, &hash_ctl, - HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + EXPECTED_MAX_HDFS_CONNECTIONS, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); if (HdfsFsTable == NULL ) { @@ -2389,6 +2441,30 @@ HdfsGetConnection(const char * path) errno = EIO; break; } + + elog(LOG, "created hash table for hdfs access."); + } + + if (NULL == HdfsFsTable4Drop) + { + MemSet(&hash_ctl_4drop, 0, sizeof(hash_ctl_4drop)); + hash_ctl_4drop.keysize = MAXPGPATH; + hash_ctl_4drop.entrysize = sizeof(struct FsEntry); + hash_ctl_4drop.hash = string_hash; + hash_ctl_4drop.hcxt = HdfsGlobalContext; + + HdfsFsTable4Drop = hash_create("hash connections hash table for drop", + EXPECTED_MAX_HDFS_CONNECTIONS, + &hash_ctl_4drop, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + if (HdfsFsTable4Drop == NULL ) + { + elog(WARNING, "failed to create hash table for dropping table."); + errno = EIO; + break; + } + + elog(LOG, "created hash table (4drop) for hdfs access."); } if (enable_secure_filesystem && Gp_role != GP_ROLE_EXECUTE) @@ -2403,8 +2479,21 @@ HdfsGetConnection(const char * path) } } - entry = (struct FsEntry *) hash_search(HdfsFsTable, location, - HASH_ENTER, &found); + /* If this is for normal connection, check from normal table, otherwise, + * check the table for dropping. */ + if (!isForDrop) { + entry = (struct FsEntry *) hash_search(HdfsFsTable, + location, + HASH_ENTER, + &found); + } + else + { + entry = (struct FsEntry *) hash_search(HdfsFsTable4Drop, + location, + HASH_ENTER, + &found); + } if (!found) { @@ -2423,7 +2512,10 @@ HdfsGetConnection(const char * path) errmsg("failed to get filesystem credential."), errdetail("%s", HdfsGetLastError()))); - hash_search(HdfsFsTable, location, HASH_REMOVE, &found); + if (!isForDrop) + hash_search(HdfsFsTable, location, HASH_REMOVE, &found); + else + hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found); errno = EACCES; break; } @@ -2432,7 +2524,11 @@ HdfsGetConnection(const char * path) { if (!login()) { - hash_search(HdfsFsTable, location, HASH_REMOVE, &found); + if (!isForDrop) + hash_search(HdfsFsTable, location, HASH_REMOVE, &found); + else { + hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found); + } errno = EACCES; break; } @@ -2448,7 +2544,10 @@ HdfsGetConnection(const char * path) if (NULL == entry->fs) { - hash_search(HdfsFsTable, location, HASH_REMOVE, &found); + if (!isForDrop) + hash_search(HdfsFsTable, location, HASH_REMOVE, &found); + else + hash_search(HdfsFsTable4Drop, location, HASH_REMOVE, &found); ereport(LOG, (errcode(ERRCODE_IO_ERROR), errmsg("fail to connect hdfs at %s, errno = %d", location, errno), @@ -2606,7 +2705,7 @@ HdfsBasicOpenFile(FileName fileName, int fileFlags, int fileMode, if (NULL == ConvertToUnixPath(fileName, path, sizeof(path))) return FALSE; - tempfs = HdfsGetConnection(fileName); + tempfs = HdfsGetConnection(fileName, false); if (tempfs == NULL) return FALSE; @@ -2976,7 +3075,7 @@ HdfsRemovePath(FileName fileName, int recursive) if (HdfsParsePath(fileName, &protocol, NULL, NULL, NULL) || NULL == protocol) return -1; - hdfsFS fs = HdfsGetConnection(fileName); + hdfsFS fs = HdfsGetConnection(fileName, true); if (NULL == fs) return -1; if (NULL == ConvertToUnixPath(fileName, path, sizeof(path))) @@ -3001,7 +3100,7 @@ HdfsMakeDirectory(const char * path, mode_t mode) if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || protocol == NULL) return -1; - hdfsFS fs = HdfsGetConnection(path); + hdfsFS fs = HdfsGetConnection(path, false); if (NULL == fs) return -1; if (NULL == ConvertToUnixPath(path, p, sizeof(p))) @@ -3128,7 +3227,7 @@ HdfsGetDelegationToken(const char *uri, void **fs) char *token; char *retval; - *fs = HdfsGetConnection(uri); + *fs = HdfsGetConnection(uri, false); if (*fs == NULL) return NULL; @@ -3285,7 +3384,7 @@ HdfsPathFileTruncate(FileName fileName) { return -1; } - fs = HdfsGetConnection(fileName); + fs = HdfsGetConnection(fileName, true); if (fs == NULL) return -1; @@ -3338,7 +3437,7 @@ HdfsPathExist(char *path) if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol) elog(ERROR, "cannot get protocol for path: %s", path); - fs = HdfsGetConnection(path); + fs = HdfsGetConnection(path, false); if (fs == NULL) return false; @@ -3363,7 +3462,7 @@ HdfsPathExistAndNonEmpty(char *path, bool *existed) if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol) elog(ERROR, "cannot get protocol for path: %s", path); - fs = HdfsGetConnection(path); + fs = HdfsGetConnection(path, false); if (fs == NULL) return false; @@ -3446,7 +3545,7 @@ HdfsPathSize(const char *path) return 0; if (ConvertToUnixPath(path, unixpath, sizeof(unixpath)) == NULL) return 0; - if ((fs = HdfsGetConnection(path)) == NULL) + if ((fs = HdfsGetConnection(path, false)) == NULL) return 0; total_size = HdfsPathSizeRecursive(fs, protocol, unixpath); @@ -3469,7 +3568,7 @@ HdfsGetFileLength(char * path) if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol) elog(ERROR, "cannot get protocol for path: %s", path); - fs = HdfsGetConnection(path); + fs = HdfsGetConnection(path, false); if (fs == NULL) return -1; @@ -3506,7 +3605,7 @@ int HdfsIsDirOrFile(char * path) if (HdfsParsePath(path, &protocol, NULL, NULL, NULL) || NULL == protocol) elog(ERROR, "cannot get protocol for path: %s", path); - fs = HdfsGetConnection(path); + fs = HdfsGetConnection(path, false); if (fs == NULL) return -1; @@ -3552,7 +3651,7 @@ HdfsGetFileBlockLocations2(const char *path, int64 offset, int64 length, int *bl elog(ERROR, "cannot get protocol for path: %s", path); } - fs = HdfsGetConnection(path); + fs = HdfsGetConnection(path, false); if (NULL == fs) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31aeb4a1/src/include/storage/fd.h ---------------------------------------------------------------------- diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index 0c25264..dde0c69 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -113,6 +113,7 @@ extern char *DeserializeDelegationToken(void *binary, int size); extern void cleanup_lru_opened_files(void); extern void cleanup_filesystem_handler(void); +extern void cleanup_hdfs_handlers_for_dropping(void); /* abstract file system */ extern File FileNameOpenFile(FileName fileName, const char *temp_dir, int fileFlags, int fileMode);