This is an automated email from the ASF dual-hosted git repository.

tomaz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/libcloud.git

commit b0921294333a8c44da5c575b825a308f0fd724a6
Author: Tomaz Muraus <[email protected]>
AuthorDate: Tue Nov 3 16:26:05 2020 +0100

    Add tests for local storage lock context manager.
---
 libcloud/storage/drivers/local.py   | 16 +++++++-----
 libcloud/test/storage/test_local.py | 49 +++++++++++++++++++++++++++++++++++--
 2 files changed, 57 insertions(+), 8 deletions(-)

diff --git a/libcloud/storage/drivers/local.py 
b/libcloud/storage/drivers/local.py
index 603cee4..aa4ddeb 100644
--- a/libcloud/storage/drivers/local.py
+++ b/libcloud/storage/drivers/local.py
@@ -55,8 +55,9 @@ class LockLocalStorage(object):
     A class to help in locking a local path before being updated
     """
 
-    def __init__(self, path):
+    def __init__(self, path, timeout=5):
         self.path = path
+        self.lock_acquire_timeout = timeout
 
         self.ipc_lock_path = os.path.join(tempfile.gettempdir(), "%s.lock" % (
             sha256(path.encode("utf-8")).hexdigest()))
@@ -68,7 +69,7 @@ class LockLocalStorage(object):
         self.ipc_lock = fasteners.InterProcessLock(self.ipc_lock_path)
 
     def __enter__(self):
-        lock_acquire_timeout = 5
+        lock_acquire_timeout = self.lock_acquire_timeout
         start_time = int(time.time())
         end_time = start_time + lock_acquire_timeout
 
@@ -82,14 +83,17 @@ class LockLocalStorage(object):
 
         if not success:
             raise LibcloudError("Failed to acquire thread lock for path %s "
-                                "in 5 seconds" % (self.path))
+                                "in %s seconds" % (self.path,
+                                                   lock_acquire_timeout))
 
-        success = self.ipc_lock.acquire(blocking=True, timeout=5)
+        success = self.ipc_lock.acquire(blocking=True,
+                                        timeout=lock_acquire_timeout)
 
         if not success:
             raise LibcloudError("Failed to acquire IPC lock (%s) for path %s "
-                                "in 5 seconds" %
-                                (self.ipc_lock_path, self.path))
+                                "in %s seconds" %
+                                (self.ipc_lock_path, self.path,
+                                 lock_acquire_timeout))
 
     def __exit__(self, type, value, traceback):
         if self.thread_lock.locked():
diff --git a/libcloud/test/storage/test_local.py 
b/libcloud/test/storage/test_local.py
index e01e91c..8fa5b41 100644
--- a/libcloud/test/storage/test_local.py
+++ b/libcloud/test/storage/test_local.py
@@ -20,9 +20,9 @@ import sys
 import platform
 import shutil
 import unittest
+import time
 import tempfile
-
-import mock
+import multiprocessing
 
 from libcloud.common.types import LibcloudError
 from libcloud.storage.base import Container
@@ -74,6 +74,51 @@ class LocalTests(unittest.TestCase):
                 return
             raise e
 
+    def test_lock_local_storage(self):
+        # 1. Acquire succeeds
+        lock = LockLocalStorage("/tmp/a")
+        with lock:
+            self.assertTrue(True)
+
+        # 2. Acquire fails because lock is already acquired
+        lock = LockLocalStorage("/tmp/b", timeout=0.5)
+        with lock:
+            expected_msg = "Failed to acquire thread lock"
+            self.assertRaisesRegex(LibcloudError, expected_msg, lock.__enter__)
+
+        # 3. Multiprocessing scenario where IPC lock is involved
+        def acquire_lock_in_subprocess(pid, success):
+            # For first process acquire should succeed and for the second it 
should fail
+
+            lock = LockLocalStorage("/tmp/c", timeout=0.5)
+
+            if pid == 1:
+                with lock:
+                    time.sleep(1)
+
+                success.value = 1
+            elif pid == 2:
+                expected_msg = "Failed to acquire IPC lock"
+                self.assertRaisesRegex(LibcloudError, expected_msg, 
lock.__enter__)
+                success.value = 1
+            else:
+                raise ValueError("Invalid pid")
+
+        success_1  = multiprocessing.Value('i', 0)
+        success_2  = multiprocessing.Value('i', 0)
+
+        p1 = multiprocessing.Process(target=acquire_lock_in_subprocess, 
args=(1, success_1,))
+        p1.start()
+
+        p2 = multiprocessing.Process(target=acquire_lock_in_subprocess, 
args=(2, success_2,))
+        p2.start()
+
+        p1.join()
+        p2.join()
+
+        self.assertEqual(bool(success_1.value), True, "Check didn't pass")
+        self.assertEqual(bool(success_2.value), True, "Second check didn't 
pass")
+
     def test_list_containers_empty(self):
         containers = self.driver.list_containers()
         self.assertEqual(len(containers), 0)

Reply via email to