Adam Litke has uploaded a new change for review. Change subject: Refactor v2v jobs for reusability ......................................................................
Refactor v2v jobs for reusability Change-Id: Ida6b1c460c5030c820c540e836e423d4632410df Signed-off-by: Adam Litke <[email protected]> --- M lib/vdsm/define.py M tests/Makefile.am A tests/jobsTests.py M vdsm.spec.in M vdsm/Makefile.am A vdsm/jobs.py M vdsm/v2v.py 7 files changed, 379 insertions(+), 212 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/57/44857/1 diff --git a/lib/vdsm/define.py b/lib/vdsm/define.py index 53f530d..b4e3981 100644 --- a/lib/vdsm/define.py +++ b/lib/vdsm/define.py @@ -151,16 +151,16 @@ 'V2VConnection': {'status': { 'code': 65, 'message': 'error connecting to hypervisor'}}, - 'V2VNoSuchJob': {'status': { + 'NoSuchJob': {'status': { 'code': 66, 'message': 'Job Id does not exists'}}, 'V2VNoSuchOvf': {'status': { 'code': 67, 'message': 'OVF file does not exists'}}, - 'V2VJobNotDone': {'status': { + 'JobNotDone': {'status': { 'code': 68, 'message': 'Job status is not done'}}, - 'V2VJobExists': {'status': { + 'JobExists': {'status': { 'code': 69, 'message': 'Job id already exists'}}, 'hotplugMem': {'status': { diff --git a/tests/Makefile.am b/tests/Makefile.am index 174982c..888a866 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -57,6 +57,7 @@ iproute2Tests.py \ ipwrapperTests.py \ iscsiTests.py \ + jobsTests.py \ libvirtconnectionTests.py \ lvmTests.py \ main.py \ diff --git a/tests/jobsTests.py b/tests/jobsTests.py new file mode 100644 index 0000000..01a28f8 --- /dev/null +++ b/tests/jobsTests.py @@ -0,0 +1,81 @@ +# Copyright 2015 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import uuid + +import jobs + +from testlib import VdsmTestCase as TestCaseBase +from testValidation import slowtest + + +class TestingJob(jobs.Job): + PROC_WAIT_TIMEOUT = 1 + + def __init__(self, job_id): + jobs.Job.__init__(self, job_id) + self._progress = 0 + + @property + def progress(self): + return self._progress + + @classmethod + def from_shell(cls, job_id, shell_script): + def _cmd(): + return ['bash', '-c', shell_script] + + obj = cls(job_id) + obj._create_command = _cmd + return obj + + +class JobsTests(TestCaseBase): + TIMEOUT = 1 + + def setUp(self): + self.job_id = str(uuid.uuid4()) + + def test_simple(self): + job = TestingJob.from_shell(self.job_id, 'true') + self.assertEqual(jobs.STATUS.STARTING, job.status) + job.start() + self.assertTrue(job.proc_finished.wait(self.TIMEOUT)) + self.assertEqual(jobs.STATUS.DONE, job.status) + + def test_cmd_fail(self): + job = TestingJob.from_shell(self.job_id, 'false') + job.start() + self.assertTrue(job.proc_finished.wait(self.TIMEOUT)) + self.assertEqual(jobs.STATUS.FAILED, job.status) + + def test_abort(self): + job = TestingJob.from_shell(self.job_id, 'sleep 5') + job.start() + self.assertTrue(job.proc_started.wait(self.TIMEOUT)) + job.abort() + self.assertEqual(jobs.STATUS.ABORTED, job.status) + + @slowtest + def test_timeout(self): + job = TestingJob.from_shell(self.job_id, 'sleep 5') + job.PROC_WAIT_TIMEOUT = 1 + job.start() + self.assertTrue(job.proc_finished.wait(2)) + self.assertEqual(jobs.STATUS.FAILED, job.status) diff --git a/vdsm.spec.in b/vdsm.spec.in index d17338d..2acf32a 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -791,6 +791,7 @@ %{_datadir}/%{vdsm_name}/hooking.py* %{_datadir}/%{vdsm_name}/hooks.py* %{_datadir}/%{vdsm_name}/hostdev.py* +%{_datadir}/%{vdsm_name}/jobs.py* %{_datadir}/%{vdsm_name}/mk_sysprep_floppy %{_datadir}/%{vdsm_name}/parted_utils.py* %{_datadir}/%{vdsm_name}/mkimage.py* diff --git a/vdsm/Makefile.am b/vdsm/Makefile.am index 4c0578e..be72875 100644 --- a/vdsm/Makefile.am +++ b/vdsm/Makefile.am @@ -33,6 +33,7 @@ hooking.py \ hooks.py \ hostdev.py \ + jobs.py \ kaxmlrpclib.py \ logUtils.py \ mkimage.py \ diff --git a/vdsm/jobs.py b/vdsm/jobs.py new file mode 100644 index 0000000..8b7d9fb --- /dev/null +++ b/vdsm/jobs.py @@ -0,0 +1,276 @@ +# Copyright 2015 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import errno +import logging +import signal +import threading + +from vdsm.define import errCode, doneCode +from vdsm.infra import zombiereaper +from vdsm.utils import traceback, execCmd + + +_lock = threading.Lock() +_jobs = {} + + +class STATUS: + ''' + STARTING: request granted and starting the process + ABORTED: user initiated aborted + FAILED: error during process + DONE: process successfully finished + ''' + STARTING = 'starting' + ABORTED = 'aborted' + FAILED = 'error' + DONE = 'done' + + +class ClientError(Exception): + ''' Base class for client error ''' + + +class JobExistsError(ClientError): + ''' Job already exists in _jobs collection ''' + err_name = 'JobExistsError' + + +class NoSuchJob(ClientError): + ''' Job not exists in _jobs collection ''' + err_name = 'NoSuchJob' + + +class JobNotDone(ClientError): + ''' Import process still in progress ''' + err_name = 'JobNotDone' + + +class JobError(Exception): + ''' Base class for all internal job errors''' + + +class ProcessError(JobError): + ''' process had error in execution ''' + + +def delete_job(job_id): + try: + job = _get_job(job_id) + _validate_job_finished(job) + _remove_job(job_id) + except ClientError as e: + logging.info('Cannot delete job, error: %s', e) + return errCode[e.err_name] + return {'status': doneCode} + + +def abort_job(job_id): + try: + job = _get_job(job_id) + job.abort() + except ClientError as e: + logging.info('Cannot abort job, error: %s', e) + return errCode[e.err_name] + return {'status': doneCode} + + +def get_jobs_status(): + ret = {} + with _lock: + items = tuple(_jobs.items()) + for job_id, job in items: + ret[job_id] = { + 'status': job.status, + 'description': job.description, + 'progress': job.progress + } + return ret + + +def add_job(job_id, job): + with _lock: + if job_id in _jobs: + raise JobExistsError("Job %r exists" % job_id) + _jobs[job_id] = job + + +def _get_job(job_id): + with _lock: + if job_id not in _jobs: + raise NoSuchJob("No such job %r" % job_id) + return _jobs[job_id] + + +def _remove_job(job_id): + with _lock: + if job_id not in _jobs: + raise NoSuchJob("No such job %r" % job_id) + del _jobs[job_id] + + +def _validate_job_done(job): + if job.status != STATUS.DONE: + raise JobNotDone("Job %r is %s" % (job.id, job.status)) + + +def _validate_job_finished(job): + if job.status not in (STATUS.DONE, STATUS.FAILED, STATUS.ABORTED): + raise JobNotDone("Job %r is %s" % (job.id, job.status)) + + +class Job(object): + TERM_DELAY = 30 + PROC_WAIT_TIMEOUT = 30 + + def __init__(self, job_id): + self._id = job_id + self._status = STATUS.STARTING + self._aborted = False + self._description = '' + + self._create_command = None + + # Override this if you need to perform some setup when starting thread + self._run_command = self._run + + self.proc_started = threading.Event() + self.proc_finished = threading.Event() + + def start(self): + t = threading.Thread(target=self._run_command) + t.daemon = True + t.start() + + @property + def id(self): + return self._id + + @property + def status(self): + return self._status + + @property + def description(self): + return self._description + + @property + def progress(self): + """ + Must be overridden by child class + """ + raise NotImplementedError() + + def abort(self): + self._status = STATUS.ABORTED + logging.info('Job %r aborting...', self._id) + self._abort() + + def _abort(self): + self._aborted = True + if self._proc.returncode is None: + logging.debug('Job %r killing process', self._id) + try: + self._proc.kill() + except OSError as e: + if e.errno != errno.ESRCH: + raise + logging.debug('Job %r process not running', + self._id) + else: + logging.debug('Job %r process was killed', + self._id) + finally: + zombiereaper.autoReapPID(self._proc.pid) + + def _execution_environments(self): + """ + May be overridden by child class to set process environment variables + """ + return {} + + def _prepare(self): + """ + May be overridden by child class to perform any pre-processing + """ + pass + + def _watch_process_output(self): + """ + May be overridden by child class to monitor process output and update + progress information. + """ + pass + + def _cleanup(self): + """ + May be overridden by child class to perform any postprocessing + """ + pass + + @traceback(msg="Job failed") + def _run(self): + try: + self._start() + except Exception as ex: + if self._aborted: + logging.debug("Job %r was aborted", self._id) + else: + logging.exception("Job %r failed", self._id) + self._status = STATUS.FAILED + self._description = ex.message + try: + self._abort() + except Exception as e: + logging.exception('Job %r, error trying to abort: %r', + self._id, e) + finally: + self.proc_finished.set() + self._cleanup() + + def _start(self): + # TODO: use the process handling http://gerrit.ovirt.org/#/c/33909/ + cmd = self._create_command() + logging.info('Job %r starting process', self._id) + + self._proc = execCmd(cmd, sync=False, deathSignal=signal.SIGTERM, + env=self._execution_environments()) + self.proc_started.set() + self._proc.blocking = True + self._watch_process_output() + self._wait_for_process() + + if self._proc.returncode != 0: + raise ProcessError('Job %r process failed exit-code: %r' + ', stderr: %s' % + (self._id, self._proc.returncode, + self._proc.stderr.read(1024))) + + if self._status != STATUS.ABORTED: + self._status = STATUS.DONE + logging.info('Job %r finished import successfully', self._id) + + def _wait_for_process(self): + if self._proc.returncode is not None: + return + logging.debug("Job %r waiting for process", self._id) + if not self._proc.wait(timeout=self.PROC_WAIT_TIMEOUT): + raise ProcessError("Job %r timeout waiting for process pid=%s", + self._id, self._proc.pid) diff --git a/vdsm/v2v.py b/vdsm/v2v.py index d0bcc3a..431d264 100644 --- a/vdsm/v2v.py +++ b/vdsm/v2v.py @@ -30,8 +30,6 @@ import logging import os import re -import signal -import threading import xml.etree.ElementTree as ET import libvirt @@ -39,14 +37,10 @@ from vdsm.constants import P_VDSM_RUN from vdsm.define import errCode, doneCode from vdsm import libvirtconnection, response -from vdsm.infra import zombiereaper -from vdsm.utils import traceback, CommandPath, execCmd +from vdsm.utils import CommandPath, execCmd import caps - - -_lock = threading.Lock() -_jobs = {} +import jobs _V2V_DIR = os.path.join(P_VDSM_RUN, 'v2v') _VIRT_V2V = CommandPath('virt-v2v', '/usr/bin/virt-v2v') @@ -65,27 +59,15 @@ DiskProgress = namedtuple('DiskProgress', ['progress']) -class STATUS: +class V2VSTATUS(jobs.STATUS): ''' - STARTING: request granted and starting the import process COPYING_DISK: copying disk in progress - ABORTED: user initiated aborted - FAILED: error during import process - DONE: convert process successfully finished ''' - STARTING = 'starting' COPYING_DISK = 'copying_disk' - ABORTED = 'aborted' - FAILED = 'error' - DONE = 'done' class V2VError(Exception): ''' Base class for v2v errors ''' - - -class ClientError(Exception): - ''' Base class for client error ''' class InvalidVMConfiguration(ValueError): @@ -96,23 +78,8 @@ ''' Error while parsing virt-v2v output ''' -class JobExistsError(ClientError): - ''' Job already exists in _jobs collection ''' - err_name = 'V2VJobExistsError' - - -class VolumeError(ClientError): +class VolumeError(jobs.ClientError): ''' Error preparing volume ''' - - -class NoSuchJob(ClientError): - ''' Job not exists in _jobs collection ''' - err_name = 'V2VNoSuchJob' - - -class JobNotDone(ClientError): - ''' Import process still in progress ''' - err_name = 'V2VJobNotDone' class NoSuchOvf(V2VError): @@ -120,11 +87,7 @@ err_name = 'V2VNoSuchOvf' -class V2VProcessError(V2VError): - ''' virt-v2v process had error in execution ''' - - -class InvalidInputError(ClientError): +class InvalidInputError(jobs.ClientError): ''' Invalid input received ''' @@ -169,14 +132,14 @@ def convert_external_vm(uri, username, password, vminfo, job_id, irs): job = ImportVm.from_libvirt(uri, username, password, vminfo, job_id, irs) job.start() - _add_job(job_id, job) + jobs.add_job(job_id, job) return {'status': doneCode} def convert_ova(ova_path, vminfo, job_id, irs): job = ImportVm.from_ova(ova_path, vminfo, job_id, irs) job.start() - _add_job(job_id, job) + jobs.add_job(job_id, job) return response.success() @@ -198,81 +161,16 @@ def get_converted_vm(job_id): try: - job = _get_job(job_id) - _validate_job_done(job) + job = jobs._get_job(job_id) + jobs._validate_job_done(job) ovf = _read_ovf(job_id) - except ClientError as e: + except jobs.ClientError as e: logging.info('Converted VM error %s', e) return errCode[e.err_name] except V2VError as e: logging.error('Converted VM error %s', e) return errCode[e.err_name] return {'status': doneCode, 'ovf': ovf} - - -def delete_job(job_id): - try: - job = _get_job(job_id) - _validate_job_finished(job) - _remove_job(job_id) - except ClientError as e: - logging.info('Cannot delete job, error: %s', e) - return errCode[e.err_name] - return {'status': doneCode} - - -def abort_job(job_id): - try: - job = _get_job(job_id) - job.abort() - except ClientError as e: - logging.info('Cannot abort job, error: %s', e) - return errCode[e.err_name] - return {'status': doneCode} - - -def get_jobs_status(): - ret = {} - with _lock: - items = tuple(_jobs.items()) - for job_id, job in items: - ret[job_id] = { - 'status': job.status, - 'description': job.description, - 'progress': job.progress - } - return ret - - -def _add_job(job_id, job): - with _lock: - if job_id in _jobs: - raise JobExistsError("Job %r exists" % job_id) - _jobs[job_id] = job - - -def _get_job(job_id): - with _lock: - if job_id not in _jobs: - raise NoSuchJob("No such job %r" % job_id) - return _jobs[job_id] - - -def _remove_job(job_id): - with _lock: - if job_id not in _jobs: - raise NoSuchJob("No such job %r" % job_id) - del _jobs[job_id] - - -def _validate_job_done(job): - if job.status != STATUS.DONE: - raise JobNotDone("Job %r is %s" % (job.id, job.status)) - - -def _validate_job_finished(job): - if job.status not in (STATUS.DONE, STATUS.FAILED, STATUS.ABORTED): - raise JobNotDone("Job %r is %s" % (job.id, job.status)) def _read_ovf(job_id): @@ -311,32 +209,24 @@ job_id, file_name) -class ImportVm(object): - TERM_DELAY = 30 - PROC_WAIT_TIMEOUT = 30 - +class ImportVm(jobs.Job): def __init__(self, vminfo, job_id, irs): ''' do not use directly, use a factory method instead! ''' + jobs.Job.__init__(self, job_id) self._vminfo = vminfo - self._id = job_id self._irs = irs - self._status = STATUS.STARTING - self._description = '' self._disk_progress = 0 self._disk_count = 1 self._current_disk = 1 - self._aborted = False self._prepared_volumes = [] self._uri = None self._username = None self._password = None self._passwd_file = None - self._create_command = None - self._run_command = None self._ova_path = None @@ -361,23 +251,6 @@ obj._run_command = obj._run return obj - def start(self): - t = threading.Thread(target=self._run_command) - t.daemon = True - t.start() - - @property - def id(self): - return self._id - - @property - def status(self): - return self._status - - @property - def description(self): - return self._description - @property def progress(self): ''' @@ -393,47 +266,11 @@ with password_file(self._id, self._passwd_file, self._password): self._run() - @traceback(msg="Error importing vm") - def _run(self): - try: - self._import() - except Exception as ex: - if self._aborted: - logging.debug("Job %r was aborted", self._id) - else: - logging.exception("Job %r failed", self._id) - self._status = STATUS.FAILED - self._description = ex.message - try: - self._abort() - except Exception as e: - logging.exception('Job %r, error trying to abort: %r', - self._id, e) - finally: - self._teardown_volumes() - - def _import(self): - # TODO: use the process handling http://gerrit.ovirt.org/#/c/33909/ + def _prepare(self): self._prepare_volumes() - cmd = self._create_command() - logging.info('Job %r starting import', self._id) - self._proc = execCmd(cmd, sync=False, deathSignal=signal.SIGTERM, - env=self._execution_environments()) - - self._proc.blocking = True - self._watch_process_output() - self._wait_for_process() - - if self._proc.returncode != 0: - raise V2VProcessError('Job %r process failed exit-code: %r' - ', stderr: %s' % - (self._id, self._proc.returncode, - self._proc.stderr.read(1024))) - - if self._status != STATUS.ABORTED: - self._status = STATUS.DONE - logging.info('Job %r finished import successfully', self._id) + def _cleanup(self): + self._teardown_volumes() def _execution_environments(self): env = {'LIBGUESTFS_BACKEND': 'direct'} @@ -441,19 +278,11 @@ env['VIRTIO_WIN'] = self._vminfo['virtio_iso_path'] return env - def _wait_for_process(self): - if self._proc.returncode is not None: - return - logging.debug("Job %r waiting for virt-v2v process", self._id) - if not self._proc.wait(timeout=self.PROC_WAIT_TIMEOUT): - raise V2VProcessError("Job %r timeout waiting for process pid=%s", - self._id, self._proc.pid) - def _watch_process_output(self): parser = OutputParser() for event in parser.parse(self._proc.stdout): if isinstance(event, ImportProgress): - self._status = STATUS.COPYING_DISK + self._status = V2VSTATUS.COPYING_DISK logging.info("Job %r copying disk %d/%d", self._id, event.current_disk, event.disk_count) self._disk_progress = 0 @@ -503,28 +332,6 @@ get_storage_domain_path(self._prepared_volumes[0]['path'])] cmd.extend(self._generate_disk_parameters()) return cmd - - def abort(self): - self._status = STATUS.ABORTED - logging.info('Job %r aborting...', self._id) - self._abort() - - def _abort(self): - self._aborted = True - if self._proc.returncode is None: - logging.debug('Job %r killing virt-v2v process', self._id) - try: - self._proc.kill() - except OSError as e: - if e.errno != errno.ESRCH: - raise - logging.debug('Job %r virt-v2v process not running', - self._id) - else: - logging.debug('Job %r virt-v2v process was killed', - self._id) - finally: - zombiereaper.autoReapPID(self._proc.pid) def _get_disk_format(self): fmt = self._vminfo.get('format', 'raw').lower() -- To view, visit https://gerrit.ovirt.org/44857 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ida6b1c460c5030c820c540e836e423d4632410df Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Adam Litke <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
