This is an automated email from the ASF dual-hosted git repository.

jxie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new bea5fd1  unlink memory shared file immediately on linux (#10992)
bea5fd1 is described below

commit bea5fd13c5445647a1aeedddd3c5be4406d8fb9c
Author: Eric Junyuan Xie <piiswr...@users.noreply.github.com>
AuthorDate: Fri May 18 14:07:32 2018 -0700

    unlink memory shared file immediately on linux (#10992)
---
 python/mxnet/gluon/data/dataloader.py    | 48 +++++++++++++++++++++++---------
 src/storage/cpu_shared_storage_manager.h | 34 ++++++++++++++++++----
 2 files changed, 63 insertions(+), 19 deletions(-)

diff --git a/python/mxnet/gluon/data/dataloader.py 
b/python/mxnet/gluon/data/dataloader.py
index d80a6bf..151b49d 100644
--- a/python/mxnet/gluon/data/dataloader.py
+++ b/python/mxnet/gluon/data/dataloader.py
@@ -20,27 +20,49 @@
 """Dataset generator."""
 __all__ = ['DataLoader']
 
-import multiprocessing
-import multiprocessing.queues
-from multiprocessing.reduction import ForkingPickler
 import pickle
 import io
 import sys
+import multiprocessing
+import multiprocessing.queues
+from multiprocessing.reduction import ForkingPickler
 import numpy as np
 
+try:
+    import multiprocessing.resource_sharer
+except ImportError:
+    pass
+
 from . import sampler as _sampler
 from ... import nd, context
 
-
-def rebuild_ndarray(*args):
-    """Rebuild ndarray from pickled shared memory"""
-    # pylint: disable=no-value-for-parameter
-    return nd.NDArray(nd.ndarray._new_from_shared_mem(*args))
-
-
-def reduce_ndarray(data):
-    """Reduce ndarray to shared memory handle"""
-    return rebuild_ndarray, data._to_shared_mem()
+if sys.platform == 'darwin' or sys.platform == 'win32':
+    def rebuild_ndarray(*args):
+        """Rebuild ndarray from pickled shared memory"""
+        # pylint: disable=no-value-for-parameter
+        return nd.NDArray(nd.ndarray._new_from_shared_mem(*args))
+
+    def reduce_ndarray(data):
+        """Reduce ndarray to shared memory handle"""
+        return rebuild_ndarray, data._to_shared_mem()
+else:
+    def rebuild_ndarray(pid, fd, shape, dtype):
+        """Rebuild ndarray from pickled shared memory"""
+        # pylint: disable=no-value-for-parameter
+        if sys.version_info[0] == 2:
+            fd = multiprocessing.reduction.rebuild_handle(fd)
+        else:
+            fd = fd.detach()
+        return nd.NDArray(nd.ndarray._new_from_shared_mem(pid, fd, shape, 
dtype))
+
+    def reduce_ndarray(data):
+        """Reduce ndarray to shared memory handle"""
+        pid, fd, shape, dtype = data._to_shared_mem()
+        if sys.version_info[0] == 2:
+            fd = multiprocessing.reduction.reduce_handle(fd)
+        else:
+            fd = multiprocessing.reduction.DupFd(fd)
+        return rebuild_ndarray, (pid, fd, shape, dtype)
 
 ForkingPickler.register(nd.NDArray, reduce_ndarray)
 
diff --git a/src/storage/cpu_shared_storage_manager.h 
b/src/storage/cpu_shared_storage_manager.h
index e2de30d..85c6a35 100644
--- a/src/storage/cpu_shared_storage_manager.h
+++ b/src/storage/cpu_shared_storage_manager.h
@@ -118,10 +118,11 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* 
handle) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   std::uniform_int_distribution<> dis(0, std::numeric_limits<int>::max());
   int fid = -1;
+  std::string filename;
   bool is_new = false;
   size_t size = handle->size + alignment_;
   void *ptr = nullptr;
-  #ifdef _WIN32
+#ifdef _WIN32
   CheckAndRealFree();
   HANDLE map_handle = nullptr;
   uint32_t error = 0;
@@ -130,7 +131,7 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* 
handle) {
     handle->shared_pid = _getpid();
     for (int i = 0; i < 10; ++i) {
       handle->shared_id = dis(rand_gen_);
-      auto filename = SharedHandleToString(handle->shared_pid, 
handle->shared_id);
+      filename = SharedHandleToString(handle->shared_pid, handle->shared_id);
       map_handle = CreateFileMapping(INVALID_HANDLE_VALUE,
                                      NULL, PAGE_READWRITE, 0, size, 
filename.c_str());
       if ((error = GetLastError()) == ERROR_SUCCESS) {
@@ -138,7 +139,7 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* 
handle) {
       }
     }
   } else {
-    auto filename = SharedHandleToString(handle->shared_pid, 
handle->shared_id);
+    filename = SharedHandleToString(handle->shared_pid, handle->shared_id);
     map_handle = OpenFileMapping(FILE_MAP_READ | FILE_MAP_WRITE,
                                  FALSE, filename.c_str());
     error = GetLastError();
@@ -159,13 +160,17 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* 
handle) {
     handle->shared_pid = getpid();
     for (int i = 0; i < 10; ++i) {
       handle->shared_id = dis(rand_gen_);
-      auto filename = SharedHandleToString(handle->shared_pid, 
handle->shared_id);
+      filename = SharedHandleToString(handle->shared_pid, handle->shared_id);
       fid = shm_open(filename.c_str(), O_EXCL|O_CREAT|O_RDWR, 0666);
       if (fid != -1) break;
     }
   } else {
-    auto filename = SharedHandleToString(handle->shared_pid, 
handle->shared_id);
+#ifdef __linux__
+    fid = handle->shared_id;
+#else
+    filename = SharedHandleToString(handle->shared_pid, handle->shared_id);
     fid = shm_open(filename.c_str(), O_RDWR, 0666);
+#endif  // __linux__
   }
 
   if (fid == -1) {
@@ -178,7 +183,18 @@ void CPUSharedStorageManager::Alloc(Storage::Handle* 
handle) {
   ptr = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fid, 0);
   CHECK_NE(ptr, MAP_FAILED)
       << "Failed to map shared memory. mmap failed with error " << 
strerror(errno);
-  close(fid);
+#ifdef __linux__
+  handle->shared_id = fid;
+  if (is_new) {
+    CHECK_EQ(shm_unlink(filename.c_str()), 0)
+      << "Failed to unlink shared memory. shm_unlink failed with error "
+      << strerror(errno);
+  }
+#else
+  CHECK_EQ(close(fid), 0)
+      << "Failed to close shared memory. close failed with error "
+      << strerror(errno);
+#endif  // __linux__
 #endif  // _WIN32
 
   if (is_new) {
@@ -199,12 +215,18 @@ void CPUSharedStorageManager::FreeImpl(const 
Storage::Handle& handle) {
       << "Failed to unmap shared memory. munmap failed with error "
       << strerror(errno);
 
+#ifdef __linux__
+  CHECK_EQ(close(handle.shared_id), 0)
+      << "Failed to close shared memory. close failed with error "
+      << strerror(errno);
+#else
   if (count == 0) {
     auto filename = SharedHandleToString(handle.shared_pid, handle.shared_id);
     CHECK_EQ(shm_unlink(filename.c_str()), 0)
         << "Failed to unlink shared memory. shm_unlink failed with error "
         << strerror(errno);
   }
+#endif  // __linux__
 #endif  // _WIN32
 }
 

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to