This is an automated email from the ASF dual-hosted git repository. jxie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push: new bea5fd1 unlink memory shared file immediately on linux (#10992) bea5fd1 is described below commit bea5fd13c5445647a1aeedddd3c5be4406d8fb9c Author: Eric Junyuan Xie <piiswr...@users.noreply.github.com> AuthorDate: Fri May 18 14:07:32 2018 -0700 unlink memory shared file immediately on linux (#10992) --- python/mxnet/gluon/data/dataloader.py | 48 +++++++++++++++++++++++--------- src/storage/cpu_shared_storage_manager.h | 34 ++++++++++++++++++---- 2 files changed, 63 insertions(+), 19 deletions(-) diff --git a/python/mxnet/gluon/data/dataloader.py b/python/mxnet/gluon/data/dataloader.py index d80a6bf..151b49d 100644 --- a/python/mxnet/gluon/data/dataloader.py +++ b/python/mxnet/gluon/data/dataloader.py @@ -20,27 +20,49 @@ """Dataset generator.""" __all__ = ['DataLoader'] -import multiprocessing -import multiprocessing.queues -from multiprocessing.reduction import ForkingPickler import pickle import io import sys +import multiprocessing +import multiprocessing.queues +from multiprocessing.reduction import ForkingPickler import numpy as np +try: + import multiprocessing.resource_sharer +except ImportError: + pass + from . import sampler as _sampler from ... import nd, context - -def rebuild_ndarray(*args): - """Rebuild ndarray from pickled shared memory""" - # pylint: disable=no-value-for-parameter - return nd.NDArray(nd.ndarray._new_from_shared_mem(*args)) - - -def reduce_ndarray(data): - """Reduce ndarray to shared memory handle""" - return rebuild_ndarray, data._to_shared_mem() +if sys.platform == 'darwin' or sys.platform == 'win32': + def rebuild_ndarray(*args): + """Rebuild ndarray from pickled shared memory""" + # pylint: disable=no-value-for-parameter + return nd.NDArray(nd.ndarray._new_from_shared_mem(*args)) + + def reduce_ndarray(data): + """Reduce ndarray to shared memory handle""" + return rebuild_ndarray, data._to_shared_mem() +else: + def rebuild_ndarray(pid, fd, shape, dtype): + """Rebuild ndarray from pickled shared memory""" + # pylint: disable=no-value-for-parameter + if sys.version_info[0] == 2: + fd = multiprocessing.reduction.rebuild_handle(fd) + else: + fd = fd.detach() + return nd.NDArray(nd.ndarray._new_from_shared_mem(pid, fd, shape, dtype)) + + def reduce_ndarray(data): + """Reduce ndarray to shared memory handle""" + pid, fd, shape, dtype = data._to_shared_mem() + if sys.version_info[0] == 2: + fd = multiprocessing.reduction.reduce_handle(fd) + else: + fd = multiprocessing.reduction.DupFd(fd) + return rebuild_ndarray, (pid, fd, shape, dtype) ForkingPickler.register(nd.NDArray, reduce_ndarray) diff --git a/src/storage/cpu_shared_storage_manager.h b/src/storage/cpu_shared_storage_manager.h index e2de30d..85c6a35 100644 --- a/src/storage/cpu_shared_storage_manager.h +++ b/src/storage/cpu_shared_storage_manager.h @@ -118,10 +118,11 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* handle) { std::lock_guard<std::recursive_mutex> lock(mutex_); std::uniform_int_distribution<> dis(0, std::numeric_limits<int>::max()); int fid = -1; + std::string filename; bool is_new = false; size_t size = handle->size + alignment_; void *ptr = nullptr; - #ifdef _WIN32 +#ifdef _WIN32 CheckAndRealFree(); HANDLE map_handle = nullptr; uint32_t error = 0; @@ -130,7 +131,7 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* handle) { handle->shared_pid = _getpid(); for (int i = 0; i < 10; ++i) { handle->shared_id = dis(rand_gen_); - auto filename = SharedHandleToString(handle->shared_pid, handle->shared_id); + filename = SharedHandleToString(handle->shared_pid, handle->shared_id); map_handle = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0, size, filename.c_str()); if ((error = GetLastError()) == ERROR_SUCCESS) { @@ -138,7 +139,7 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* handle) { } } } else { - auto filename = SharedHandleToString(handle->shared_pid, handle->shared_id); + filename = SharedHandleToString(handle->shared_pid, handle->shared_id); map_handle = OpenFileMapping(FILE_MAP_READ | FILE_MAP_WRITE, FALSE, filename.c_str()); error = GetLastError(); @@ -159,13 +160,17 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* handle) { handle->shared_pid = getpid(); for (int i = 0; i < 10; ++i) { handle->shared_id = dis(rand_gen_); - auto filename = SharedHandleToString(handle->shared_pid, handle->shared_id); + filename = SharedHandleToString(handle->shared_pid, handle->shared_id); fid = shm_open(filename.c_str(), O_EXCL|O_CREAT|O_RDWR, 0666); if (fid != -1) break; } } else { - auto filename = SharedHandleToString(handle->shared_pid, handle->shared_id); +#ifdef __linux__ + fid = handle->shared_id; +#else + filename = SharedHandleToString(handle->shared_pid, handle->shared_id); fid = shm_open(filename.c_str(), O_RDWR, 0666); +#endif // __linux__ } if (fid == -1) { @@ -178,7 +183,18 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* handle) { ptr = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fid, 0); CHECK_NE(ptr, MAP_FAILED) << "Failed to map shared memory. mmap failed with error " << strerror(errno); - close(fid); +#ifdef __linux__ + handle->shared_id = fid; + if (is_new) { + CHECK_EQ(shm_unlink(filename.c_str()), 0) + << "Failed to unlink shared memory. shm_unlink failed with error " + << strerror(errno); + } +#else + CHECK_EQ(close(fid), 0) + << "Failed to close shared memory. close failed with error " + << strerror(errno); +#endif // __linux__ #endif // _WIN32 if (is_new) { @@ -199,12 +215,18 @@ void CPUSharedStorageManager::FreeImpl(const Storage::Handle& handle) { << "Failed to unmap shared memory. munmap failed with error " << strerror(errno); +#ifdef __linux__ + CHECK_EQ(close(handle.shared_id), 0) + << "Failed to close shared memory. close failed with error " + << strerror(errno); +#else if (count == 0) { auto filename = SharedHandleToString(handle.shared_pid, handle.shared_id); CHECK_EQ(shm_unlink(filename.c_str()), 0) << "Failed to unlink shared memory. shm_unlink failed with error " << strerror(errno); } +#endif // __linux__ #endif // _WIN32 } -- To stop receiving notification emails like this one, please contact j...@apache.org.