Adam Litke has uploaded a new change for review.

Change subject: Refactor v2v jobs for reusability
......................................................................

Refactor v2v jobs for reusability

Change-Id: Ida6b1c460c5030c820c540e836e423d4632410df
Signed-off-by: Adam Litke <[email protected]>
---
M lib/vdsm/define.py
M tests/Makefile.am
A tests/jobsTests.py
M vdsm.spec.in
M vdsm/Makefile.am
A vdsm/jobs.py
M vdsm/v2v.py
7 files changed, 379 insertions(+), 212 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/57/44857/1

diff --git a/lib/vdsm/define.py b/lib/vdsm/define.py
index 53f530d..b4e3981 100644
--- a/lib/vdsm/define.py
+++ b/lib/vdsm/define.py
@@ -151,16 +151,16 @@
     'V2VConnection': {'status': {
         'code': 65,
         'message': 'error connecting to hypervisor'}},
-    'V2VNoSuchJob': {'status': {
+    'NoSuchJob': {'status': {
         'code': 66,
         'message': 'Job Id does not exists'}},
     'V2VNoSuchOvf': {'status': {
         'code': 67,
         'message': 'OVF file does not exists'}},
-    'V2VJobNotDone': {'status': {
+    'JobNotDone': {'status': {
         'code': 68,
         'message': 'Job status is not done'}},
-    'V2VJobExists': {'status': {
+    'JobExists': {'status': {
         'code': 69,
         'message': 'Job id already exists'}},
     'hotplugMem': {'status': {
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 174982c..888a866 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -57,6 +57,7 @@
        iproute2Tests.py \
        ipwrapperTests.py \
        iscsiTests.py \
+       jobsTests.py \
        libvirtconnectionTests.py \
        lvmTests.py \
        main.py \
diff --git a/tests/jobsTests.py b/tests/jobsTests.py
new file mode 100644
index 0000000..01a28f8
--- /dev/null
+++ b/tests/jobsTests.py
@@ -0,0 +1,81 @@
+# Copyright 2015 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import uuid
+
+import jobs
+
+from testlib import VdsmTestCase as TestCaseBase
+from testValidation import slowtest
+
+
+class TestingJob(jobs.Job):
+    PROC_WAIT_TIMEOUT = 1
+
+    def __init__(self, job_id):
+        jobs.Job.__init__(self, job_id)
+        self._progress = 0
+
+    @property
+    def progress(self):
+        return self._progress
+
+    @classmethod
+    def from_shell(cls, job_id, shell_script):
+        def _cmd():
+            return ['bash', '-c', shell_script]
+
+        obj = cls(job_id)
+        obj._create_command = _cmd
+        return obj
+
+
+class JobsTests(TestCaseBase):
+    TIMEOUT = 1
+
+    def setUp(self):
+        self.job_id = str(uuid.uuid4())
+
+    def test_simple(self):
+        job = TestingJob.from_shell(self.job_id, 'true')
+        self.assertEqual(jobs.STATUS.STARTING, job.status)
+        job.start()
+        self.assertTrue(job.proc_finished.wait(self.TIMEOUT))
+        self.assertEqual(jobs.STATUS.DONE, job.status)
+
+    def test_cmd_fail(self):
+        job = TestingJob.from_shell(self.job_id, 'false')
+        job.start()
+        self.assertTrue(job.proc_finished.wait(self.TIMEOUT))
+        self.assertEqual(jobs.STATUS.FAILED, job.status)
+
+    def test_abort(self):
+        job = TestingJob.from_shell(self.job_id, 'sleep 5')
+        job.start()
+        self.assertTrue(job.proc_started.wait(self.TIMEOUT))
+        job.abort()
+        self.assertEqual(jobs.STATUS.ABORTED, job.status)
+
+    @slowtest
+    def test_timeout(self):
+        job = TestingJob.from_shell(self.job_id, 'sleep 5')
+        job.PROC_WAIT_TIMEOUT = 1
+        job.start()
+        self.assertTrue(job.proc_finished.wait(2))
+        self.assertEqual(jobs.STATUS.FAILED, job.status)
diff --git a/vdsm.spec.in b/vdsm.spec.in
index d17338d..2acf32a 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -791,6 +791,7 @@
 %{_datadir}/%{vdsm_name}/hooking.py*
 %{_datadir}/%{vdsm_name}/hooks.py*
 %{_datadir}/%{vdsm_name}/hostdev.py*
+%{_datadir}/%{vdsm_name}/jobs.py*
 %{_datadir}/%{vdsm_name}/mk_sysprep_floppy
 %{_datadir}/%{vdsm_name}/parted_utils.py*
 %{_datadir}/%{vdsm_name}/mkimage.py*
diff --git a/vdsm/Makefile.am b/vdsm/Makefile.am
index 4c0578e..be72875 100644
--- a/vdsm/Makefile.am
+++ b/vdsm/Makefile.am
@@ -33,6 +33,7 @@
        hooking.py \
        hooks.py \
        hostdev.py \
+       jobs.py \
        kaxmlrpclib.py \
        logUtils.py \
        mkimage.py \
diff --git a/vdsm/jobs.py b/vdsm/jobs.py
new file mode 100644
index 0000000..8b7d9fb
--- /dev/null
+++ b/vdsm/jobs.py
@@ -0,0 +1,276 @@
+# Copyright 2015 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import errno
+import logging
+import signal
+import threading
+
+from vdsm.define import errCode, doneCode
+from vdsm.infra import zombiereaper
+from vdsm.utils import traceback, execCmd
+
+
+_lock = threading.Lock()
+_jobs = {}
+
+
+class STATUS:
+    '''
+    STARTING: request granted and starting the process
+    ABORTED: user initiated aborted
+    FAILED: error during process
+    DONE: process successfully finished
+    '''
+    STARTING = 'starting'
+    ABORTED = 'aborted'
+    FAILED = 'error'
+    DONE = 'done'
+
+
+class ClientError(Exception):
+    ''' Base class for client error '''
+
+
+class JobExistsError(ClientError):
+    ''' Job already exists in _jobs collection '''
+    err_name = 'JobExistsError'
+
+
+class NoSuchJob(ClientError):
+    ''' Job not exists in _jobs collection '''
+    err_name = 'NoSuchJob'
+
+
+class JobNotDone(ClientError):
+    ''' Import process still in progress '''
+    err_name = 'JobNotDone'
+
+
+class JobError(Exception):
+    ''' Base class for all internal job errors'''
+
+
+class ProcessError(JobError):
+    ''' process had error in execution '''
+
+
+def delete_job(job_id):
+    try:
+        job = _get_job(job_id)
+        _validate_job_finished(job)
+        _remove_job(job_id)
+    except ClientError as e:
+        logging.info('Cannot delete job, error: %s', e)
+        return errCode[e.err_name]
+    return {'status': doneCode}
+
+
+def abort_job(job_id):
+    try:
+        job = _get_job(job_id)
+        job.abort()
+    except ClientError as e:
+        logging.info('Cannot abort job, error: %s', e)
+        return errCode[e.err_name]
+    return {'status': doneCode}
+
+
+def get_jobs_status():
+    ret = {}
+    with _lock:
+        items = tuple(_jobs.items())
+    for job_id, job in items:
+        ret[job_id] = {
+            'status': job.status,
+            'description': job.description,
+            'progress': job.progress
+        }
+    return ret
+
+
+def add_job(job_id, job):
+    with _lock:
+        if job_id in _jobs:
+            raise JobExistsError("Job %r exists" % job_id)
+        _jobs[job_id] = job
+
+
+def _get_job(job_id):
+    with _lock:
+        if job_id not in _jobs:
+            raise NoSuchJob("No such job %r" % job_id)
+        return _jobs[job_id]
+
+
+def _remove_job(job_id):
+    with _lock:
+        if job_id not in _jobs:
+            raise NoSuchJob("No such job %r" % job_id)
+        del _jobs[job_id]
+
+
+def _validate_job_done(job):
+    if job.status != STATUS.DONE:
+        raise JobNotDone("Job %r is %s" % (job.id, job.status))
+
+
+def _validate_job_finished(job):
+    if job.status not in (STATUS.DONE, STATUS.FAILED, STATUS.ABORTED):
+        raise JobNotDone("Job %r is %s" % (job.id, job.status))
+
+
+class Job(object):
+    TERM_DELAY = 30
+    PROC_WAIT_TIMEOUT = 30
+
+    def __init__(self, job_id):
+        self._id = job_id
+        self._status = STATUS.STARTING
+        self._aborted = False
+        self._description = ''
+
+        self._create_command = None
+
+        # Override this if you need to perform some setup when starting thread
+        self._run_command = self._run
+
+        self.proc_started = threading.Event()
+        self.proc_finished = threading.Event()
+
+    def start(self):
+        t = threading.Thread(target=self._run_command)
+        t.daemon = True
+        t.start()
+
+    @property
+    def id(self):
+        return self._id
+
+    @property
+    def status(self):
+        return self._status
+
+    @property
+    def description(self):
+        return self._description
+
+    @property
+    def progress(self):
+        """
+        Must be overridden by child class
+        """
+        raise NotImplementedError()
+
+    def abort(self):
+        self._status = STATUS.ABORTED
+        logging.info('Job %r aborting...', self._id)
+        self._abort()
+
+    def _abort(self):
+        self._aborted = True
+        if self._proc.returncode is None:
+            logging.debug('Job %r killing process', self._id)
+            try:
+                self._proc.kill()
+            except OSError as e:
+                if e.errno != errno.ESRCH:
+                    raise
+                logging.debug('Job %r process not running',
+                              self._id)
+            else:
+                logging.debug('Job %r process was killed',
+                              self._id)
+            finally:
+                zombiereaper.autoReapPID(self._proc.pid)
+
+    def _execution_environments(self):
+        """
+        May be overridden by child class to set process environment variables
+        """
+        return {}
+
+    def _prepare(self):
+        """
+        May be overridden by child class to perform any pre-processing
+        """
+        pass
+
+    def _watch_process_output(self):
+        """
+        May be overridden by child class to monitor process output and update
+        progress information.
+        """
+        pass
+
+    def _cleanup(self):
+        """
+        May be overridden by child class to perform any postprocessing
+        """
+        pass
+
+    @traceback(msg="Job failed")
+    def _run(self):
+        try:
+            self._start()
+        except Exception as ex:
+            if self._aborted:
+                logging.debug("Job %r was aborted", self._id)
+            else:
+                logging.exception("Job %r failed", self._id)
+                self._status = STATUS.FAILED
+                self._description = ex.message
+                try:
+                    self._abort()
+                except Exception as e:
+                    logging.exception('Job %r, error trying to abort: %r',
+                                      self._id, e)
+        finally:
+            self.proc_finished.set()
+            self._cleanup()
+
+    def _start(self):
+        # TODO: use the process handling http://gerrit.ovirt.org/#/c/33909/
+        cmd = self._create_command()
+        logging.info('Job %r starting process', self._id)
+
+        self._proc = execCmd(cmd, sync=False, deathSignal=signal.SIGTERM,
+                             env=self._execution_environments())
+        self.proc_started.set()
+        self._proc.blocking = True
+        self._watch_process_output()
+        self._wait_for_process()
+
+        if self._proc.returncode != 0:
+            raise ProcessError('Job %r process failed exit-code: %r'
+                               ', stderr: %s' %
+                               (self._id, self._proc.returncode,
+                                self._proc.stderr.read(1024)))
+
+        if self._status != STATUS.ABORTED:
+            self._status = STATUS.DONE
+            logging.info('Job %r finished import successfully', self._id)
+
+    def _wait_for_process(self):
+        if self._proc.returncode is not None:
+            return
+        logging.debug("Job %r waiting for process", self._id)
+        if not self._proc.wait(timeout=self.PROC_WAIT_TIMEOUT):
+            raise ProcessError("Job %r timeout waiting for process pid=%s",
+                               self._id, self._proc.pid)
diff --git a/vdsm/v2v.py b/vdsm/v2v.py
index d0bcc3a..431d264 100644
--- a/vdsm/v2v.py
+++ b/vdsm/v2v.py
@@ -30,8 +30,6 @@
 import logging
 import os
 import re
-import signal
-import threading
 import xml.etree.ElementTree as ET
 
 import libvirt
@@ -39,14 +37,10 @@
 from vdsm.constants import P_VDSM_RUN
 from vdsm.define import errCode, doneCode
 from vdsm import libvirtconnection, response
-from vdsm.infra import zombiereaper
-from vdsm.utils import traceback, CommandPath, execCmd
+from vdsm.utils import CommandPath, execCmd
 
 import caps
-
-
-_lock = threading.Lock()
-_jobs = {}
+import jobs
 
 _V2V_DIR = os.path.join(P_VDSM_RUN, 'v2v')
 _VIRT_V2V = CommandPath('virt-v2v', '/usr/bin/virt-v2v')
@@ -65,27 +59,15 @@
 DiskProgress = namedtuple('DiskProgress', ['progress'])
 
 
-class STATUS:
+class V2VSTATUS(jobs.STATUS):
     '''
-    STARTING: request granted and starting the import process
     COPYING_DISK: copying disk in progress
-    ABORTED: user initiated aborted
-    FAILED: error during import process
-    DONE: convert process successfully finished
     '''
-    STARTING = 'starting'
     COPYING_DISK = 'copying_disk'
-    ABORTED = 'aborted'
-    FAILED = 'error'
-    DONE = 'done'
 
 
 class V2VError(Exception):
     ''' Base class for v2v errors '''
-
-
-class ClientError(Exception):
-    ''' Base class for client error '''
 
 
 class InvalidVMConfiguration(ValueError):
@@ -96,23 +78,8 @@
     ''' Error while parsing virt-v2v output '''
 
 
-class JobExistsError(ClientError):
-    ''' Job already exists in _jobs collection '''
-    err_name = 'V2VJobExistsError'
-
-
-class VolumeError(ClientError):
+class VolumeError(jobs.ClientError):
     ''' Error preparing volume '''
-
-
-class NoSuchJob(ClientError):
-    ''' Job not exists in _jobs collection '''
-    err_name = 'V2VNoSuchJob'
-
-
-class JobNotDone(ClientError):
-    ''' Import process still in progress '''
-    err_name = 'V2VJobNotDone'
 
 
 class NoSuchOvf(V2VError):
@@ -120,11 +87,7 @@
     err_name = 'V2VNoSuchOvf'
 
 
-class V2VProcessError(V2VError):
-    ''' virt-v2v process had error in execution '''
-
-
-class InvalidInputError(ClientError):
+class InvalidInputError(jobs.ClientError):
     ''' Invalid input received '''
 
 
@@ -169,14 +132,14 @@
 def convert_external_vm(uri, username, password, vminfo, job_id, irs):
     job = ImportVm.from_libvirt(uri, username, password, vminfo, job_id, irs)
     job.start()
-    _add_job(job_id, job)
+    jobs.add_job(job_id, job)
     return {'status': doneCode}
 
 
 def convert_ova(ova_path, vminfo, job_id, irs):
     job = ImportVm.from_ova(ova_path, vminfo, job_id, irs)
     job.start()
-    _add_job(job_id, job)
+    jobs.add_job(job_id, job)
     return response.success()
 
 
@@ -198,81 +161,16 @@
 
 def get_converted_vm(job_id):
     try:
-        job = _get_job(job_id)
-        _validate_job_done(job)
+        job = jobs._get_job(job_id)
+        jobs._validate_job_done(job)
         ovf = _read_ovf(job_id)
-    except ClientError as e:
+    except jobs.ClientError as e:
         logging.info('Converted VM error %s', e)
         return errCode[e.err_name]
     except V2VError as e:
         logging.error('Converted VM error %s', e)
         return errCode[e.err_name]
     return {'status': doneCode, 'ovf': ovf}
-
-
-def delete_job(job_id):
-    try:
-        job = _get_job(job_id)
-        _validate_job_finished(job)
-        _remove_job(job_id)
-    except ClientError as e:
-        logging.info('Cannot delete job, error: %s', e)
-        return errCode[e.err_name]
-    return {'status': doneCode}
-
-
-def abort_job(job_id):
-    try:
-        job = _get_job(job_id)
-        job.abort()
-    except ClientError as e:
-        logging.info('Cannot abort job, error: %s', e)
-        return errCode[e.err_name]
-    return {'status': doneCode}
-
-
-def get_jobs_status():
-    ret = {}
-    with _lock:
-        items = tuple(_jobs.items())
-    for job_id, job in items:
-        ret[job_id] = {
-            'status': job.status,
-            'description': job.description,
-            'progress': job.progress
-        }
-    return ret
-
-
-def _add_job(job_id, job):
-    with _lock:
-        if job_id in _jobs:
-            raise JobExistsError("Job %r exists" % job_id)
-        _jobs[job_id] = job
-
-
-def _get_job(job_id):
-    with _lock:
-        if job_id not in _jobs:
-            raise NoSuchJob("No such job %r" % job_id)
-        return _jobs[job_id]
-
-
-def _remove_job(job_id):
-    with _lock:
-        if job_id not in _jobs:
-            raise NoSuchJob("No such job %r" % job_id)
-        del _jobs[job_id]
-
-
-def _validate_job_done(job):
-    if job.status != STATUS.DONE:
-        raise JobNotDone("Job %r is %s" % (job.id, job.status))
-
-
-def _validate_job_finished(job):
-    if job.status not in (STATUS.DONE, STATUS.FAILED, STATUS.ABORTED):
-        raise JobNotDone("Job %r is %s" % (job.id, job.status))
 
 
 def _read_ovf(job_id):
@@ -311,32 +209,24 @@
                               job_id, file_name)
 
 
-class ImportVm(object):
-    TERM_DELAY = 30
-    PROC_WAIT_TIMEOUT = 30
-
+class ImportVm(jobs.Job):
     def __init__(self, vminfo, job_id, irs):
         '''
         do not use directly, use a factory method instead!
         '''
+        jobs.Job.__init__(self, job_id)
         self._vminfo = vminfo
-        self._id = job_id
         self._irs = irs
 
-        self._status = STATUS.STARTING
-        self._description = ''
         self._disk_progress = 0
         self._disk_count = 1
         self._current_disk = 1
-        self._aborted = False
         self._prepared_volumes = []
 
         self._uri = None
         self._username = None
         self._password = None
         self._passwd_file = None
-        self._create_command = None
-        self._run_command = None
 
         self._ova_path = None
 
@@ -361,23 +251,6 @@
         obj._run_command = obj._run
         return obj
 
-    def start(self):
-        t = threading.Thread(target=self._run_command)
-        t.daemon = True
-        t.start()
-
-    @property
-    def id(self):
-        return self._id
-
-    @property
-    def status(self):
-        return self._status
-
-    @property
-    def description(self):
-        return self._description
-
     @property
     def progress(self):
         '''
@@ -393,47 +266,11 @@
         with password_file(self._id, self._passwd_file, self._password):
             self._run()
 
-    @traceback(msg="Error importing vm")
-    def _run(self):
-        try:
-            self._import()
-        except Exception as ex:
-            if self._aborted:
-                logging.debug("Job %r was aborted", self._id)
-            else:
-                logging.exception("Job %r failed", self._id)
-                self._status = STATUS.FAILED
-                self._description = ex.message
-                try:
-                    self._abort()
-                except Exception as e:
-                    logging.exception('Job %r, error trying to abort: %r',
-                                      self._id, e)
-        finally:
-            self._teardown_volumes()
-
-    def _import(self):
-        # TODO: use the process handling http://gerrit.ovirt.org/#/c/33909/
+    def _prepare(self):
         self._prepare_volumes()
-        cmd = self._create_command()
-        logging.info('Job %r starting import', self._id)
 
-        self._proc = execCmd(cmd, sync=False, deathSignal=signal.SIGTERM,
-                             env=self._execution_environments())
-
-        self._proc.blocking = True
-        self._watch_process_output()
-        self._wait_for_process()
-
-        if self._proc.returncode != 0:
-            raise V2VProcessError('Job %r process failed exit-code: %r'
-                                  ', stderr: %s' %
-                                  (self._id, self._proc.returncode,
-                                   self._proc.stderr.read(1024)))
-
-        if self._status != STATUS.ABORTED:
-            self._status = STATUS.DONE
-            logging.info('Job %r finished import successfully', self._id)
+    def _cleanup(self):
+        self._teardown_volumes()
 
     def _execution_environments(self):
         env = {'LIBGUESTFS_BACKEND': 'direct'}
@@ -441,19 +278,11 @@
             env['VIRTIO_WIN'] = self._vminfo['virtio_iso_path']
         return env
 
-    def _wait_for_process(self):
-        if self._proc.returncode is not None:
-            return
-        logging.debug("Job %r waiting for virt-v2v process", self._id)
-        if not self._proc.wait(timeout=self.PROC_WAIT_TIMEOUT):
-            raise V2VProcessError("Job %r timeout waiting for process pid=%s",
-                                  self._id, self._proc.pid)
-
     def _watch_process_output(self):
         parser = OutputParser()
         for event in parser.parse(self._proc.stdout):
             if isinstance(event, ImportProgress):
-                self._status = STATUS.COPYING_DISK
+                self._status = V2VSTATUS.COPYING_DISK
                 logging.info("Job %r copying disk %d/%d",
                              self._id, event.current_disk, event.disk_count)
                 self._disk_progress = 0
@@ -503,28 +332,6 @@
                get_storage_domain_path(self._prepared_volumes[0]['path'])]
         cmd.extend(self._generate_disk_parameters())
         return cmd
-
-    def abort(self):
-        self._status = STATUS.ABORTED
-        logging.info('Job %r aborting...', self._id)
-        self._abort()
-
-    def _abort(self):
-        self._aborted = True
-        if self._proc.returncode is None:
-            logging.debug('Job %r killing virt-v2v process', self._id)
-            try:
-                self._proc.kill()
-            except OSError as e:
-                if e.errno != errno.ESRCH:
-                    raise
-                logging.debug('Job %r virt-v2v process not running',
-                              self._id)
-            else:
-                logging.debug('Job %r virt-v2v process was killed',
-                              self._id)
-            finally:
-                zombiereaper.autoReapPID(self._proc.pid)
 
     def _get_disk_format(self):
         fmt = self._vminfo.get('format', 'raw').lower()


-- 
To view, visit https://gerrit.ovirt.org/44857
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ida6b1c460c5030c820c540e836e423d4632410df
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Adam Litke <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to