Francesco Romani has uploaded a new change for review.

Change subject: virt: sampling: replace flag with Stage
......................................................................

virt: sampling: replace flag with Stage

In order to detect stuck sampling calls, the SampleVM operation used
a simple boolean flag to mark business or not.

However, this may lead to races if an operation unblocks just after the
flag was set.
This race is expected to do little harm, but could and should be improved
anyway.

We can consider threading.Semaphore (or BoundedSemaphore), but we don't
want to rate-limit the number of working doing sampling, due to stuck
calls which can hold the semaphore indefinitely;
Moreover, we definitely don't want to fresh thread to wait for the
stuck ones.

We just need a safe object which can tell us if a critical section is
busy or not, because calling code will need to react in a different way than
just block.

So, we introduce the simple 'Stage' utility class, which just guards a
critical section (renamed 'stage'), and which keeps track of the performers
on the stage at any given time.

Client code can query the stage for emptiness, and react properly.
'Stage' is somehow reminiscent of a Semaphore, but just do accouting.

Change-Id: I09b4fec4d609fe22e890de911342f80505de88c9
Signed-off-by: Francesco Romani <[email protected]>
---
M tests/vmUtilsTests.py
M vdsm/virt/sampling.py
M vdsm/virt/utils.py
3 files changed, 81 insertions(+), 19 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/22/40322/1

diff --git a/tests/vmUtilsTests.py b/tests/vmUtilsTests.py
index 74bf378..699b74e 100644
--- a/tests/vmUtilsTests.py
+++ b/tests/vmUtilsTests.py
@@ -18,6 +18,9 @@
 # Refer to the README and COPYING files for full details of the license
 #
 
+import threading
+import time
+
 from virt import utils
 
 from testlib import VdsmTestCase as TestCaseBase
@@ -125,3 +128,40 @@
 
         clock.now = 3.0
         self.assertFalse(cache)
+
+
+class StageTests(TestCaseBase):
+    def setUp(self):
+        self.stage = utils.Stage()
+
+    def test_default_empty(self):
+        self.assertTrue(self.stage.empty)
+
+    def test_busy(self):
+        with self.stage.performer():
+            self.assertFalse(self.stage.empty)
+        self.assertTrue(self.stage.empty)
+
+    def test_busy_different_thread(self):
+
+        run = True
+
+        def _fun():
+            with self.stage.performer():
+                while run:
+                    time.sleep(0.1)
+
+        t = threading.Thread(target=_fun)
+
+        self.assertTrue(self.stage.empty)
+
+        t.start()
+        self.assertFalse(self.stage.empty)
+
+        with self.stage.performer():
+            self.assertFalse(self.stage.empty)
+
+        run = False
+        t.join()
+
+        self.assertTrue(self.stage.empty)
diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index 746f059..09ce2b3 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -37,7 +37,7 @@
 from vdsm.config import config
 import v2v
 
-from .utils import ExpiringCache
+from .utils import ExpiringCache, Stage
 
 import caps
 
@@ -499,36 +499,35 @@
         self._stats_cache = stats_cache
         self._stats_flags = stats_flags
         self._skip_doms = ExpiringCache(timeout)
-        self._sampling = False
+        self._sampling = Stage()
         self._log = logging.getLogger("sampling.SampleVMs")
 
     def __call__(self):
         timestamp = self._stats_cache.clock()
         # we are deep in the hot path. bool(ExpiringCache)
         # *is* costly so we should avoid it if we can.
-        fast_path = (not self._sampling and not self._skip_doms)
-        self._sampling = True
+        fast_path = (self._sampling.empty and not self._skip_doms)
         try:
-            if fast_path:
-                # This is expected to be the common case.
-                # If everything's ok, we can skip all the costly checks.
-                bulk_stats = self._conn.getAllDomainStats(self._stats_flags)
-            else:
-                # A previous call got stuck, or not every domain
-                # has properly recovered. Thus we must whitelist domains.
-                doms = self._get_responsive_doms()
-                self._log.debug('sampling %d domains', len(doms))
-                if doms:
-                    bulk_stats = self._conn.domainListGetStats(
-                        doms, self._stats_flags)
+            with self._sampling.performer():
+                if fast_path:
+                    # This is expected to be the common case.
+                    # If everything's ok, we can skip all the costly checks.
+                    bulk_stats = self._conn.getAllDomainStats(
+                        self._stats_flags)
                 else:
-                    bulk_stats = []
+                    # A previous call got stuck, or not every domain
+                    # has properly recovered. Thus we must whitelist domains.
+                    doms = self._get_responsive_doms()
+                    self._log.debug('sampling %d domains', len(doms))
+                    if doms:
+                        bulk_stats = self._conn.domainListGetStats(
+                            doms, self._stats_flags)
+                    else:
+                        bulk_stats = []
         except Exception:
             self._log.exception("vm sampling failed")
         else:
             self._stats_cache.put(_translate(bulk_stats), timestamp)
-        finally:
-            self._sampling = False
 
     def _get_responsive_doms(self):
         vms = self._get_vms()
diff --git a/vdsm/virt/utils.py b/vdsm/virt/utils.py
index 60becfa..08ef8e7 100644
--- a/vdsm/virt/utils.py
+++ b/vdsm/virt/utils.py
@@ -22,6 +22,7 @@
 shared utilities and common code for the virt package
 """
 
+from contextlib import contextmanager
 import threading
 
 from vdsm.utils import monotonic_time
@@ -110,3 +111,25 @@
                 raise ItemExpired
 
             return value
+
+
+class Stage(object):
+    def __init__(self):
+        self._lock = threading.Lock()
+        self._count = 0
+
+    @property
+    def empty(self):
+        with self._lock:
+            return self._count == 0
+
+    @contextmanager
+    def performer(self):
+        with self._lock:
+            self._count += 1
+
+        try:
+            yield self
+        finally:
+            with self._lock:
+                self._count -= 1


-- 
To view, visit https://gerrit.ovirt.org/40322
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I09b4fec4d609fe22e890de911342f80505de88c9
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to